[ https://issues.apache.org/jira/browse/BEAM-6932?focusedWorklogId=220594&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-220594 ]
ASF GitHub Bot logged work on BEAM-6932: ---------------------------------------- Author: ASF GitHub Bot Created on: 29/Mar/19 16:13 Start Date: 29/Mar/19 16:13 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #8163: [BEAM-6932] SamzaRunner: migrate to use new Samza 1.1.0 liraries URL: https://github.com/apache/beam/pull/8163#discussion_r270479355 ########## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java ########## @@ -18,34 +18,67 @@ package org.apache.beam.runners.samza.container; import java.time.Duration; -import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.SamzaApplication; +import org.apache.samza.application.descriptors.ApplicationDescriptor; +import org.apache.samza.application.descriptors.ApplicationDescriptorImpl; +import org.apache.samza.application.descriptors.ApplicationDescriptorUtil; import org.apache.samza.config.Config; import org.apache.samza.config.ShellCommandConfig; +import org.apache.samza.context.ExternalContext; import org.apache.samza.job.ApplicationStatus; -import org.apache.samza.runtime.LocalContainerRunner; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ContainerLaunchUtil; +import org.apache.samza.util.SamzaUncaughtExceptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Runs the beam Yarn container, using the static global job model. */ -public class BeamContainerRunner extends LocalContainerRunner { +public class BeamContainerRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(ContainerCfgFactory.class); - public BeamContainerRunner(Config config) { - super(ContainerCfgFactory.jobModel, System.getenv(ShellCommandConfig.ENV_CONTAINER_ID())); + private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc; + + public BeamContainerRunner(SamzaApplication app, Config config) { + this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); + } + + @Override + public void run(ExternalContext externalContext) { + Thread.setDefaultUncaughtExceptionHandler( + new SamzaUncaughtExceptionHandler( + () -> { + LOG.info("Exiting process now."); Review comment: Makes sense. Let me put our another pr for this given how hard it is to get all the checks passed :). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 220594) Time Spent: 1h (was: 50m) > SamzaRunner: migrate to use new Samza 1.1.0 liraries > ---------------------------------------------------- > > Key: BEAM-6932 > URL: https://issues.apache.org/jira/browse/BEAM-6932 > Project: Beam > Issue Type: Improvement > Components: runner-samza > Reporter: Xinyu Liu > Assignee: Xinyu Liu > Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Update SamzaRunner to use the latest Samza release libraries (1.1.0). -- This message was sent by Atlassian JIRA (v7.6.3#76005)