Repository: ignite Updated Branches: refs/heads/ignite-2.0 c893da70a -> c5882a85f
IGNITE-4386: Hadoop: implemented client cleanup on protocol close. This closes #1327. This closes #1339. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ffe53eb5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ffe53eb5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ffe53eb5 Branch: refs/heads/ignite-2.0 Commit: ffe53eb5a59908db3684ce11474cb875c4bf392d Parents: a976c42 Author: devozerov <[email protected]> Authored: Mon Dec 12 11:29:23 2016 +0300 Committer: devozerov <[email protected]> Committed: Thu Dec 15 13:46:24 2016 +0300 ---------------------------------------------------------------------- .../IgniteHadoopClientProtocolProvider.java | 70 ++---- .../hadoop/impl/proto/HadoopClientProtocol.java | 55 +++-- .../hadoop/mapreduce/MapReduceClient.java | 147 ++++++++++++ ...opClientProtocolMultipleServersSelfTest.java | 93 +++----- .../client/HadoopClientProtocolSelfTest.java | 228 ++++++++++--------- 5 files changed, 367 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ffe53eb5/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java index 1efe625..920e8b7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopClientProtocolProvider.java @@ -23,24 +23,16 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.ConnectorConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.client.GridClient; -import org.apache.ignite.internal.client.GridClientConfiguration; -import org.apache.ignite.internal.client.GridClientException; -import org.apache.ignite.internal.client.GridClientFactory; -import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller; import org.apache.ignite.internal.processors.hadoop.impl.proto.HadoopClientProtocol; -import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.processors.hadoop.mapreduce.MapReduceClient; import org.apache.ignite.internal.util.typedef.F; -import static org.apache.ignite.internal.client.GridClientProtocol.TCP; - /** * Ignite Hadoop client protocol provider. @@ -50,7 +42,7 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { public static final String FRAMEWORK_NAME = "ignite"; /** Clients. */ - private static final ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, MapReduceClient> cliMap = new ConcurrentHashMap<>(); /** {@inheritDoc} */ @Override public ClientProtocol create(Configuration conf) throws IOException { @@ -91,7 +83,12 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { /** {@inheritDoc} */ @Override public void close(ClientProtocol cliProto) throws IOException { - // No-op. + if (cliProto instanceof HadoopClientProtocol) { + MapReduceClient cli = ((HadoopClientProtocol)cliProto).client(); + + if (cli.release()) + cliMap.remove(cli.cluster(), cli); + } } /** @@ -102,7 +99,7 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { * @return Client protocol. * @throws IOException If failed. */ - private static ClientProtocol createProtocol(String addr, Configuration conf) throws IOException { + private ClientProtocol createProtocol(String addr, Configuration conf) throws IOException { return new HadoopClientProtocol(conf, client(addr, Collections.singletonList(addr))); } @@ -114,45 +111,24 @@ public class IgniteHadoopClientProtocolProvider extends ClientProtocolProvider { * @return Client. * @throws IOException If failed. */ - private static GridClient client(String clusterName, Collection<String> addrs) throws IOException { - try { - IgniteInternalFuture<GridClient> fut = cliMap.get(clusterName); - - if (fut == null) { - GridFutureAdapter<GridClient> fut0 = new GridFutureAdapter<>(); - - IgniteInternalFuture<GridClient> oldFut = cliMap.putIfAbsent(clusterName, fut0); + @SuppressWarnings("unchecked") + private MapReduceClient client(String clusterName, Collection<String> addrs) throws IOException { + while (true) { + MapReduceClient cli = cliMap.get(clusterName); - if (oldFut != null) - return oldFut.get(); - else { - GridClientConfiguration cliCfg = new GridClientConfiguration(); + if (cli == null) { + cli = new MapReduceClient(clusterName, addrs); - cliCfg.setProtocol(TCP); - cliCfg.setServers(addrs); - cliCfg.setMarshaller(new GridClientJdkMarshaller()); - cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day. - cliCfg.setDaemon(true); + MapReduceClient oldCli = cliMap.putIfAbsent(clusterName, cli); - try { - GridClient cli = GridClientFactory.start(cliCfg); - - fut0.onDone(cli); - - return cli; - } - catch (GridClientException e) { - fut0.onDone(e); - - throw new IOException("Failed to establish connection with Ignite: " + addrs, e); - } - } + if (oldCli != null) + cli = oldCli; } + + if (cli.acquire()) + return cli; else - return fut.get(); - } - catch (IgniteCheckedException e) { - throw new IOException("Failed to establish connection with Ignite ÑдгÑе: " + addrs, e); + cliMap.remove(clusterName, cli); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ffe53eb5/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java index be2aa09..7fc0e77 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java @@ -43,7 +43,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.processors.hadoop.mapreduce.MapReduceClient; import org.apache.ignite.internal.client.GridClientException; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; @@ -78,7 +78,7 @@ public class HadoopClientProtocol implements ClientProtocol { private final Configuration conf; /** Ignite client. */ - private volatile GridClient cli; + private final MapReduceClient cli; /** Last received version. */ private long lastVer = -1; @@ -90,9 +90,10 @@ public class HadoopClientProtocol implements ClientProtocol { * Constructor. * * @param conf Configuration. - * @param cli Ignite client. + * @param cli Client. */ - public HadoopClientProtocol(Configuration conf, GridClient cli) { + public HadoopClientProtocol(Configuration conf, MapReduceClient cli) { + assert conf != null; assert cli != null; this.conf = conf; @@ -104,7 +105,7 @@ public class HadoopClientProtocol implements ClientProtocol { try { conf.setLong(HadoopCommonUtils.REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); - HadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null); + HadoopJobId jobID = execute(HadoopProtocolNextTaskIdTask.class); conf.setLong(HadoopCommonUtils.RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); @@ -121,8 +122,8 @@ public class HadoopClientProtocol implements ClientProtocol { try { conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); - HadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(), - new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); + HadoopJobStatus status = execute(HadoopProtocolSubmitJobTask.class, + jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)); if (status == null) throw new IOException("Failed to submit job (null status obtained): " + jobId); @@ -157,8 +158,7 @@ public class HadoopClientProtocol implements ClientProtocol { /** {@inheritDoc} */ @Override public void killJob(JobID jobId) throws IOException, InterruptedException { try { - cli.compute().execute(HadoopProtocolKillJobTask.class.getName(), - new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + execute(HadoopProtocolKillJobTask.class, jobId.getJtIdentifier(), jobId.getId()); } catch (GridClientException e) { throw new IOException("Failed to kill job: " + jobId, e); @@ -181,11 +181,12 @@ public class HadoopClientProtocol implements ClientProtocol { try { Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); - HadoopProtocolTaskArguments args = delay >= 0 ? - new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : - new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); + HadoopJobStatus status; - HadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args); + if (delay >= 0) + status = execute(HadoopProtocolJobStatusTask.class, jobId.getJtIdentifier(), jobId.getId(), delay); + else + status = execute(HadoopProtocolJobStatusTask.class, jobId.getJtIdentifier(), jobId.getId()); if (status == null) throw new IOException("Job tracker doesn't have any information about the job: " + jobId); @@ -200,8 +201,8 @@ public class HadoopClientProtocol implements ClientProtocol { /** {@inheritDoc} */ @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { try { - final HadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(), - new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + final HadoopCounters counters = execute(HadoopProtocolJobCountersTask.class, + jobId.getJtIdentifier(), jobId.getId()); if (counters == null) throw new IOException("Job tracker doesn't have any information about the job: " + jobId); @@ -329,6 +330,21 @@ public class HadoopClientProtocol implements ClientProtocol { } /** + * Execute task. + * + * @param taskCls Task class. + * @param args Arguments. + * @return Result. + * @throws IOException If failed. + * @throws GridClientException If failed. + */ + private <T> T execute(Class taskCls, Object... args) throws IOException, GridClientException { + HadoopProtocolTaskArguments args0 = args != null ? new HadoopProtocolTaskArguments(args) : null; + + return cli.client().compute().execute(taskCls.getName(), args0); + } + + /** * Process received status update. * * @param status Ignite status. @@ -351,4 +367,13 @@ public class HadoopClientProtocol implements ClientProtocol { return HadoopUtils.status(lastStatus, conf); } + + /** + * Gets the GridClient data. + * + * @return The client data. + */ + public MapReduceClient client() { + return cli; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ffe53eb5/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java new file mode 100644 index 0000000..3d52176 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/mapreduce/MapReduceClient.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.mapreduce; + +import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.client.GridClientConfiguration; +import org.apache.ignite.internal.client.GridClientException; +import org.apache.ignite.internal.client.GridClientFactory; +import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.ignite.internal.client.GridClientProtocol.TCP; + +/** + * Client. + */ +public class MapReduceClient { + /** Cluster name. */ + private final String cluster; + + /** Addresses. */ + private final Collection<String> addrs; + + /** Mutex. */ + private final Object mux = new Object(); + + /** Usage counter. */ + private final AtomicInteger cnt = new AtomicInteger(); + + /** Client. */ + private volatile GridClient cli; + + /** + * Constructor. + * + * @param cluster Cluster name. + * @param addrs Addresses. + */ + public MapReduceClient(String cluster, Collection<String> addrs) { + this.cluster = cluster; + this.addrs = addrs; + } + + /** + * @return Cluster name.. + */ + public String cluster() { + return cluster; + } + + /** + * Gets the client. + * + * @return The client. + */ + public GridClient client() throws IOException { + GridClient cli0 = cli; + + if (cli0 == null) { + synchronized (mux) { + cli0 = cli; + + if (cli0 == null) { + GridClientConfiguration cliCfg = new GridClientConfiguration(); + + cliCfg.setProtocol(TCP); + cliCfg.setServers(addrs); + cliCfg.setMarshaller(new GridClientJdkMarshaller()); + cliCfg.setMaxConnectionIdleTime(24 * 60 * 60 * 1000L); // 1 day. + cliCfg.setDaemon(true); + + try { + cli0 = GridClientFactory.start(cliCfg); + + cli = cli0; + } + catch (GridClientException e) { + throw new IOException("Failed to establish connection with Ignite: " + addrs, e); + } + } + } + } + + return cli0; + } + + /** + * Increments usage count. + * + * @return {@code True} if succeeded and client can be used. + */ + public boolean acquire() { + while (true) { + int cur = cnt.get(); + + if (cur < 0) + return false; + + int next = cur + 1; + + if (cnt.compareAndSet(cur, next)) + return true; + } + } + + /** + * Decrements the usages of the client and closes it if this is the last usage. + * + * @return {@code True} if client can be closed safely by the called. + */ + public boolean release() { + int cnt0 = cnt.decrementAndGet(); + + assert cnt0 >= 0; + + if (cnt0 == 0) { + if (cnt.compareAndSet(0, -1)) { + GridClient cli0 = cli; + + if (cli0 != null) + cli0.close(); + + return true; + } + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ffe53eb5/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java index 0805be1..a4b5e6a 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolMultipleServersSelfTest.java @@ -23,8 +23,8 @@ import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -40,13 +40,10 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider; import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.client.GridClient; import org.apache.ignite.internal.client.GridServerUnreachableException; import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; @@ -79,34 +76,12 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS } /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - clearClients(); - } - - /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); - clearClients(); - super.afterTest(); } - /** - * @throws IgniteCheckedException If failed. - */ - private void clearConnectionMap() throws IgniteCheckedException { - ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = - GridTestUtils.getFieldValue(IgniteHadoopClientProtocolProvider.class, "cliMap"); - - for(IgniteInternalFuture<GridClient> fut : cliMap.values()) - fut.get().close(); - - cliMap.clear(); - } - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -117,18 +92,6 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS } /** - * - */ - private void clearClients() { - ConcurrentHashMap<String, IgniteInternalFuture<GridClient>> cliMap = GridTestUtils.getFieldValue( - IgniteHadoopClientProtocolProvider.class, - IgniteHadoopClientProtocolProvider.class, - "cliMap"); - - cliMap.clear(); - } - - /** * @throws Exception If failed. */ private void beforeJob() throws Exception { @@ -154,26 +117,31 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS private void checkJobSubmit(Configuration conf) throws Exception { final Job job = Job.getInstance(conf); - job.setJobName(JOB_NAME); + try { + job.setJobName(JOB_NAME); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); - job.setInputFormatClass(TextInputFormat.class); - job.setOutputFormatClass(OutFormat.class); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(OutFormat.class); - job.setMapperClass(TestMapper.class); - job.setReducerClass(TestReducer.class); + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); - job.setNumReduceTasks(0); + job.setNumReduceTasks(0); - FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); + FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); - job.submit(); + job.submit(); - job.waitForCompletion(false); + job.waitForCompletion(false); - assert job.getStatus().getState() == JobStatus.State.SUCCEEDED : job.getStatus().getState(); + assert job.getStatus().getState() == JobStatus.State.SUCCEEDED : job.getStatus().getState(); + } + finally { + job.getCluster().close(); + } } /** @@ -197,18 +165,25 @@ public class HadoopClientProtocolMultipleServersSelfTest extends HadoopAbstractS */ @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) public void testSingleAddress() throws Exception { - // Don't use REST_PORT to test connection fails if the only this port is configured - restPort = REST_PORT + 1; + try { + // Don't use REST_PORT to test connection fails if the only this port is configured + restPort = REST_PORT + 1; - startGrids(gridCount()); + startGrids(gridCount()); - GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() { + GridTestUtils.assertThrowsAnyCause(log, new Callable<Object>() { @Override public Object call() throws Exception { - checkJobSubmit(configSingleAddress()); - return null; - } - }, - GridServerUnreachableException.class, "Failed to connect to any of the servers in list"); + checkJobSubmit(configSingleAddress()); + return null; + } + }, + GridServerUnreachableException.class, "Failed to connect to any of the servers in list"); + } + finally { + FileSystem fs = FileSystem.get(configSingleAddress()); + + fs.close(); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ffe53eb5/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java index 1ef7dd0..7156a3d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/client/HadoopClientProtocolSelfTest.java @@ -50,7 +50,6 @@ import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider; import org.apache.ignite.igfs.IgfsFile; import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest; import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -115,7 +114,6 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { stopAllGrids(); super.afterTestsStopped(); -// IgniteHadoopClientProtocolProvider.cliMap.clear(); } /** {@inheritDoc} */ @@ -196,43 +194,48 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { final Job job = Job.getInstance(conf); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); + try { + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); - job.setMapperClass(TestCountingMapper.class); - job.setReducerClass(TestCountingReducer.class); - job.setCombinerClass(TestCountingCombiner.class); + job.setMapperClass(TestCountingMapper.class); + job.setReducerClass(TestCountingReducer.class); + job.setCombinerClass(TestCountingCombiner.class); - FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); - FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); + FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); + FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); - job.submit(); + job.submit(); - final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1); + final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1); - assertEquals(0, cntr.getValue()); + assertEquals(0, cntr.getValue()); - cntr.increment(10); + cntr.increment(10); - assertEquals(10, cntr.getValue()); + assertEquals(10, cntr.getValue()); - // Transferring to map phase. - setupLockFile.delete(); + // Transferring to map phase. + setupLockFile.delete(); - // Transferring to reduce phase. - mapLockFile.delete(); + // Transferring to reduce phase. + mapLockFile.delete(); - job.waitForCompletion(false); + job.waitForCompletion(false); - assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState()); + assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState()); - final Counters counters = job.getCounters(); + final Counters counters = job.getCounters(); - assertNotNull("counters cannot be null", counters); - assertEquals("wrong counters count", 3, counters.countCounters()); - assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue()); - assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue()); - assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue()); + assertNotNull("counters cannot be null", counters); + assertEquals("wrong counters count", 3, counters.countCounters()); + assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue()); + assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue()); + assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue()); + } + finally { + job.getCluster().close(); + } } /** @@ -304,114 +307,119 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { final Job job = Job.getInstance(conf); - job.setJobName(JOB_NAME); + try { + job.setJobName(JOB_NAME); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); - job.setMapperClass(TestMapper.class); - job.setReducerClass(TestReducer.class); + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); - if (!noCombiners) - job.setCombinerClass(TestCombiner.class); + if (!noCombiners) + job.setCombinerClass(TestCombiner.class); - if (noReducers) - job.setNumReduceTasks(0); + if (noReducers) + job.setNumReduceTasks(0); - job.setInputFormatClass(TextInputFormat.class); - job.setOutputFormatClass(TestOutputFormat.class); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(TestOutputFormat.class); - FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); - FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); + FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); + FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); - job.submit(); + job.submit(); - JobID jobId = job.getJobID(); + JobID jobId = job.getJobID(); - // Setup phase. - JobStatus jobStatus = job.getStatus(); - checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); - assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f; - assert jobStatus.getMapProgress() == 0.0f; - assert jobStatus.getReduceProgress() == 0.0f; + // Setup phase. + JobStatus jobStatus = job.getStatus(); + checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); + assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f; + assert jobStatus.getMapProgress() == 0.0f; + assert jobStatus.getReduceProgress() == 0.0f; - U.sleep(2100); + U.sleep(2100); - JobStatus recentJobStatus = job.getStatus(); + JobStatus recentJobStatus = job.getStatus(); - assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() : - "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress(); + assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() : "Old=" + + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress(); - // Transferring to map phase. - setupLockFile.delete(); + // Transferring to map phase. + setupLockFile.delete(); - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - return F.eq(1.0f, job.getStatus().getSetupProgress()); + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + return F.eq(1.0f, job.getStatus().getSetupProgress()); + } + catch (Exception e) { + throw new RuntimeException("Unexpected exception.", e); + } } - catch (Exception e) { - throw new RuntimeException("Unexpected exception.", e); - } - } - }, 5000L); + }, 5000L); - // Map phase. - jobStatus = job.getStatus(); - checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); - assert jobStatus.getSetupProgress() == 1.0f; - assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f; - assert jobStatus.getReduceProgress() == 0.0f; + // Map phase. + jobStatus = job.getStatus(); + checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); + assert jobStatus.getSetupProgress() == 1.0f; + assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f; + assert jobStatus.getReduceProgress() == 0.0f; - U.sleep(2100); + U.sleep(2100); - recentJobStatus = job.getStatus(); + recentJobStatus = job.getStatus(); - assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() : - "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress(); + assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() : "Old=" + jobStatus.getMapProgress() + + ", new=" + recentJobStatus.getMapProgress(); - // Transferring to reduce phase. - mapLockFile.delete(); + // Transferring to reduce phase. + mapLockFile.delete(); - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - return F.eq(1.0f, job.getStatus().getMapProgress()); - } - catch (Exception e) { - throw new RuntimeException("Unexpected exception.", e); + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + return F.eq(1.0f, job.getStatus().getMapProgress()); + } + catch (Exception e) { + throw new RuntimeException("Unexpected exception.", e); + } } - } - }, 5000L); + }, 5000L); - if (!noReducers) { - // Reduce phase. - jobStatus = job.getStatus(); - checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); - assert jobStatus.getSetupProgress() == 1.0f; - assert jobStatus.getMapProgress() == 1.0f; - assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f; + if (!noReducers) { + // Reduce phase. + jobStatus = job.getStatus(); + checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); + assert jobStatus.getSetupProgress() == 1.0f; + assert jobStatus.getMapProgress() == 1.0f; + assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f; - // Ensure that reduces progress increases. - U.sleep(2100); + // Ensure that reduces progress increases. + U.sleep(2100); - recentJobStatus = job.getStatus(); + recentJobStatus = job.getStatus(); - assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() : - "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress(); + assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() : "Old=" + + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress(); - reduceLockFile.delete(); - } + reduceLockFile.delete(); + } - job.waitForCompletion(false); + job.waitForCompletion(false); - jobStatus = job.getStatus(); - checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f); - assert jobStatus.getSetupProgress() == 1.0f; - assert jobStatus.getMapProgress() == 1.0f; - assert jobStatus.getReduceProgress() == 1.0f; + jobStatus = job.getStatus(); + checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f); + assert jobStatus.getSetupProgress() == 1.0f; + assert jobStatus.getMapProgress() == 1.0f; + assert jobStatus.getReduceProgress() == 1.0f; - dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT)); + dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT)); + } + finally { + job.getCluster().close(); + } } /** @@ -517,7 +525,12 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { * Test Hadoop counters. */ public enum TestCounter { - COUNTER1, COUNTER2, COUNTER3 + /** */ + COUNTER1, + /** */ + COUNTER2, + /** */ + COUNTER3 } /** @@ -535,6 +548,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { * Test combiner that counts invocations. */ public static class TestCountingCombiner extends TestReducer { + /** {@inheritDoc} */ @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException { ctx.getCounter(TestCounter.COUNTER1).increment(1); @@ -552,6 +566,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { * Test reducer that counts invocations. */ public static class TestCountingReducer extends TestReducer { + /** {@inheritDoc} */ @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException { ctx.getCounter(TestCounter.COUNTER1).increment(1); @@ -566,6 +581,9 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { // No-op. } + /** + * Test output format. + */ public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> { /** {@inheritDoc} */ @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
