Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1642#discussion_r77414257
  
    --- Diff: 
storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java 
---
    @@ -0,0 +1,459 @@
    +package org.apache.storm.daemon.supervisor;
    +
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.*;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.container.ResourceIsolationInterface;
    +import org.apache.storm.generated.LocalAssignment;
    +import org.apache.storm.generated.ProfileAction;
    +import org.apache.storm.generated.ProfileRequest;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.utils.LocalState;
    +import org.apache.storm.utils.Utils;
    +import org.junit.Test;
    +
    +public class BasicContainerTest {
    +    public static class CommandRun {
    +        final List<String> cmd;
    +        final Map<String, String> env;
    +        final File pwd;
    +        
    +        public CommandRun(List<String> cmd, Map<String, String> env, File 
pwd) {
    +            this.cmd = cmd;
    +            this.env = env;
    +            this.pwd = pwd;
    +        }
    +    }
    +    
    +    public static class MockBasicContainer extends BasicContainer {
    +        public final List<CommandRun> profileCmds = new ArrayList<>();
    +        public final List<CommandRun> workerCmds = new ArrayList<>();
    +        
    +        public MockBasicContainer(int port, LocalAssignment assignment, 
Map<String, Object> conf,
    +                String supervisorId, LocalState localState, 
ResourceIsolationInterface resourceIsolationManager,
    +                boolean recover) throws IOException {
    +            super(port, assignment, conf, supervisorId, localState, 
resourceIsolationManager, recover);
    +        }
    +        
    +        public MockBasicContainer(AdvancedFSOps ops, int port, 
LocalAssignment assignment,
    +                Map<String, Object> conf, Map<String, Object> topoConf, 
String supervisorId, 
    +                ResourceIsolationInterface resourceIsolationManager, 
LocalState localState,
    +                String profileCmd) throws IOException {
    +            super(ops, port, assignment, conf, topoConf, supervisorId, 
resourceIsolationManager, localState, profileCmd);
    +        }
    +        
    +        @Override
    +        protected Map<String, Object> readTopoConf() throws IOException {
    +            return new HashMap<>();
    +        }
    +        
    +        @Override
    +        public void createNewWorkerId() {
    +            super.createNewWorkerId();
    +        }
    +        
    +        @Override
    +        public List<String> substituteChildopts(Object value, int 
memOnheap) {
    +            return super.substituteChildopts(value, memOnheap);
    +        }
    +               
    +        @Override
    +        protected boolean runProfilingCommand(List<String> command, 
Map<String, String> env, String logPrefix,
    +                File targetDir) throws IOException, InterruptedException {
    +            profileCmds.add(new CommandRun(command, env, targetDir));
    +            return true;
    +        }
    +        
    +        @Override
    +        protected void launchWorkerProcess(List<String> command, 
Map<String, String> env, String logPrefix,
    +                ExitCodeCallback processExitCallback, File targetDir) 
throws IOException {
    +            workerCmds.add(new CommandRun(command, env, targetDir));
    +        }
    +        
    +        @Override
    +        protected String javaCmd(String cmd) {
    +            //avoid system dependent things
    +            return cmd;
    +        }
    +        
    +        @Override
    +        protected List<String> frameworkClasspath() {
    +            //We are not really running anything so make this
    +            // simple to check for
    +            return Arrays.asList("FRAMEWORK_CP");
    +        }
    +        
    +        @Override
    +        protected String javaLibraryPath(String stormRoot, Map<String, 
Object> conf) {
    +            return "JLP";
    +        }
    +    }
    +    
    +    @Test
    +    public void testCreateNewWorkerId() throws Exception {
    +        final String topoId = "test_topology";
    +        final int port = 8080;
    +        LocalAssignment la = new LocalAssignment();
    +        la.set_topology_id(topoId);
    +        
    +        AdvancedFSOps ops = mock(AdvancedFSOps.class);
    +        
    +        LocalState ls = mock(LocalState.class);
    +        
    +        MockBasicContainer mc = new MockBasicContainer(ops, port, la, new 
HashMap<String, Object>(), 
    +                new HashMap<String, Object>(), "SUPERVISOR", null, ls, 
"profile");
    +        
    +        mc.createNewWorkerId();
    +        
    +        assertNotNull(mc._workerId);
    +        verify(ls).getApprovedWorkers();
    +        Map<String, Integer> expectedNewState = new HashMap<String, 
Integer>();
    +        expectedNewState.put(mc._workerId, port);
    +        verify(ls).setApprovedWorkers(expectedNewState);
    +    }
    +    
    +    @Test
    +    public void testRecovery() throws Exception {
    +        final String topoId = "test_topology";
    +        final String workerId = "myWorker";
    +        final int port = 8080;
    +        LocalAssignment la = new LocalAssignment();
    +        la.set_topology_id(topoId);
    +        
    +        Map<String, Integer> workerState = new HashMap<String, Integer>();
    +        workerState.put(workerId, port);
    +        
    +        LocalState ls = mock(LocalState.class);
    +        when(ls.getApprovedWorkers()).thenReturn(workerState);
    +        
    +        MockBasicContainer mc = new MockBasicContainer(port, la, new 
HashMap<String, Object>(), 
    +                "SUPERVISOR", ls, null, true);
    +        
    +        assertEquals(workerId, mc._workerId);
    +    }
    +    
    +    @Test
    +    public void testRecoveryMiss() throws Exception {
    +        final String topoId = "test_topology";
    +        final int port = 8080;
    +        LocalAssignment la = new LocalAssignment();
    +        la.set_topology_id(topoId);
    +        
    +        Map<String, Integer> workerState = new HashMap<String, Integer>();
    +        workerState.put("somethingelse", port+1);
    +        
    +        LocalState ls = mock(LocalState.class);
    +        when(ls.getApprovedWorkers()).thenReturn(workerState);
    +        
    +        try {
    +            new MockBasicContainer(port, la, new HashMap<String, 
Object>(), 
    +                    "SUPERVISOR", ls, null, true);
    +            fail("Container recovered worker incorrectly");
    +        } catch (ContainerRecoveryException e) {
    +            //Expected
    +        }
    +    }
    +    
    +    @Test
    +    public void testCleanUp() throws Exception {
    +        final String topoId = "test_topology";
    +        final int port = 8080;
    +        final String workerId = "worker-id";
    +        LocalAssignment la = new LocalAssignment();
    +        la.set_topology_id(topoId);
    +        
    +        AdvancedFSOps ops = mock(AdvancedFSOps.class);
    +        
    +        Map<String, Integer> workerState = new HashMap<String, Integer>();
    +        workerState.put(workerId, port);
    +        
    +        LocalState ls = mock(LocalState.class);
    +        when(ls.getApprovedWorkers()).thenReturn(new 
HashMap<>(workerState));
    +        
    +        MockBasicContainer mc = new MockBasicContainer(ops, port, la, new 
HashMap<String, Object>(), 
    +                new HashMap<String, Object>(), "SUPERVISOR", null, ls, 
"profile");
    +        mc._workerId = workerId;
    +        
    +        mc.cleanUp();
    +        
    +        assertNull(mc._workerId);
    +        verify(ls).getApprovedWorkers();
    +        Map<String, Integer> expectedNewState = new HashMap<String, 
Integer>();
    +        verify(ls).setApprovedWorkers(expectedNewState);
    +    }
    +    
    +    @Test
    +    public void testRunProfiling() throws Exception {
    +        final long pid = 100;
    +        final String topoId = "test_topology";
    +        final int port = 8080;
    +        final String workerId = "worker-id";
    +        final String stormLocal = ContainerTest.asAbsPath("tmp", 
"testing");
    +        final String topoRoot = ContainerTest.asAbsPath(stormLocal, 
topoId, String.valueOf(port));
    +        final File workerArtifactsPid = ContainerTest.asAbsFile(topoRoot, 
"worker.pid");
    +        
    +        final Map<String, Object> superConf = new HashMap<>();
    +        superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
    +        superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
    +        
    +        LocalAssignment la = new LocalAssignment();
    +        la.set_topology_id(topoId);
    +        
    +        AdvancedFSOps ops = mock(AdvancedFSOps.class);
    +        
when(ops.slurpString(workerArtifactsPid)).thenReturn(String.valueOf(pid));
    +        
    +        LocalState ls = mock(LocalState.class);
    +        
    +        MockBasicContainer mc = new MockBasicContainer(ops, port, la, 
superConf, 
    +                new HashMap<String, Object>(), "SUPERVISOR", null, ls, 
"profile");
    +        mc._workerId = workerId;
    +        
    +        //HEAP DUMP
    +        ProfileRequest req = new ProfileRequest();
    +        req.set_action(ProfileAction.JMAP_DUMP);
    +        
    +        mc.runProfiling(req, false);
    +        
    +        assertEquals(1, mc.profileCmds.size());
    +        CommandRun cmd = mc.profileCmds.get(0);
    +        mc.profileCmds.clear();
    +        assertEquals(Arrays.asList("profile", String.valueOf(pid), "jmap", 
topoRoot), cmd.cmd);
    +        assertEquals(new File(topoRoot), cmd.pwd);
    +        
    +        //JSTACK DUMP
    +        req.set_action(ProfileAction.JSTACK_DUMP);
    +        
    +        mc.runProfiling(req, false);
    +        
    +        assertEquals(1, mc.profileCmds.size());
    +        cmd = mc.profileCmds.get(0);
    +        mc.profileCmds.clear();
    +        assertEquals(Arrays.asList("profile", String.valueOf(pid), 
"jstack", topoRoot), cmd.cmd);
    +        assertEquals(new File(topoRoot), cmd.pwd);
    +        
    +        //RESTART
    +        req.set_action(ProfileAction.JVM_RESTART);
    +        
    +        mc.runProfiling(req, false);
    +        
    +        assertEquals(1, mc.profileCmds.size());
    +        cmd = mc.profileCmds.get(0);
    +        mc.profileCmds.clear();
    +        assertEquals(Arrays.asList("profile", String.valueOf(pid), 
"kill"), cmd.cmd);
    +        assertEquals(new File(topoRoot), cmd.pwd);
    +        
    +        //JPROFILE DUMP
    +        req.set_action(ProfileAction.JPROFILE_DUMP);
    +        
    +        mc.runProfiling(req, false);
    +        
    +        assertEquals(1, mc.profileCmds.size());
    +        cmd = mc.profileCmds.get(0);
    +        mc.profileCmds.clear();
    +        assertEquals(Arrays.asList("profile", String.valueOf(pid), "dump", 
topoRoot), cmd.cmd);
    +        assertEquals(new File(topoRoot), cmd.pwd);
    +        
    +        //JPROFILE START
    +        req.set_action(ProfileAction.JPROFILE_STOP);
    --- End diff --
    
    Ya it was really confusing until I asked @kishorvpatil about it and he 
explained it all to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to