[ https://issues.apache.org/jira/browse/SAMZA-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shanthoosh Venkataraman updated SAMZA-2167: ------------------------------------------- Summary: Should not close the MetadataStore at the end of ProcessJobFactory. (was: Should not close the MetadataStore at the end of ProcessJobFactory .) > Should not close the MetadataStore at the end of ProcessJobFactory. > ------------------------------------------------------------------- > > Key: SAMZA-2167 > URL: https://issues.apache.org/jira/browse/SAMZA-2167 > Project: Samza > Issue Type: Improvement > Reporter: Shanthoosh Venkataraman > Assignee: Shanthoosh Venkataraman > Priority: Major > > Currently in ProcessJobFactory the metadata-store connection is closed after > generating the JobModel. > To read from coordinator stream only once in samza-yarn ApplicationMaster, we > ended up making the LocalityManager, TaskAssignmentManager, > ChanglogStreamManager etc. However, after the above change closing the > metadata store in ProcessJobFactory after generating the JobModel, results in > the following exception when the servlet API queries the JobModel: > {code:java} > org.codehaus.jackson.map.JsonMappingException: > samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy > has stopped. (through reference chain: > org.apache.samza.job.model.JobModel["all-container-locality"]) > at > org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218) > at > org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183) > at > org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140) > at > org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158) > at > org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112) > at > org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610) > at > org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256) > at > org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575) > at > org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081) > at > org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39) > at > org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:687) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) > at > org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at org.eclipse.jetty.server.Server.handle(Server.java:497) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245) > at > org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.samza.SamzaException: > samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy > has stopped. > at > org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311) > at > org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87) > at > org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58) > at > org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154) > at > org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148) > at > org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98) > at > org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74) > at > org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63) > at > org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123) > at > org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at > org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483) > at > org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418) > at > org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150) > ... 25 more > 2019-04-16 12:16:46.090 [qtp220371218-112] HttpChannel [WARN] / > org.codehaus.jackson.map.JsonMappingException: > samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy > has stopped. (through reference chain: > org.apache.samza.job.model.JobModel["all-container-locality"]) > at > org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218) > at > org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183) > at > org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140) > at > org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158) > at > org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112) > at > org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610) > at > org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256) > at > org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575) > at > org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081) > at > org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39) > at > org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:687) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) > at > org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at org.eclipse.jetty.server.Server.handle(Server.java:497) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245) > at > org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.samza.SamzaException: > samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy > has stopped. > at > org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311) > at > org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87) > at > org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58) > at > org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154) > at > org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148) > at > org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98) > at > org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74) > at > org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63) > at > org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123) > at > org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at > org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483) > at > org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418) > at > org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150) > ... 25 more > 2019-04-16 12:16:46.091 [qtp220371218-112] HttpChannel [WARN] Could not send > response error 500: org.codehaus.jackson.map.JsonMappingException: > samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy > has stopped. (through reference chain: > org.apache.samza.job.model.JobModel["all-container-locality"]) > {code} > The above exception causes the local deployment to fail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)