IGNITE-4428: Hadoop: moved HadoopMapReducePlanner and dependent classes to public space. This closes #1389. This closes #1394.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d14e0727 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d14e0727 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d14e0727 Branch: refs/heads/ignite-gg-11810-1 Commit: d14e0727b3dd61ab5ec2957133d77dbc25e9ba68 Parents: 77ca2e6 Author: tledkov-gridgain <tled...@gridgain.com> Authored: Mon Jan 16 16:36:25 2017 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Mon Jan 16 16:36:25 2017 +0300 ---------------------------------------------------------------------- .../configuration/HadoopConfiguration.java | 2 +- .../apache/ignite/hadoop/HadoopInputSplit.java | 54 +++++++ .../org/apache/ignite/hadoop/HadoopJob.java | 74 ++++++++++ .../ignite/hadoop/HadoopMapReducePlan.java | 80 +++++++++++ .../ignite/hadoop/HadoopMapReducePlanner.java | 40 ++++++ .../processors/hadoop/HadoopDefaultJobInfo.java | 4 +- .../processors/hadoop/HadoopFileBlock.java | 1 + .../processors/hadoop/HadoopInputSplit.java | 54 ------- .../internal/processors/hadoop/HadoopJob.java | 107 -------------- .../internal/processors/hadoop/HadoopJobEx.java | 140 +++++++++++++++++++ .../processors/hadoop/HadoopJobInfo.java | 54 +++---- .../processors/hadoop/HadoopMapReducePlan.java | 80 ----------- .../hadoop/HadoopMapReducePlanner.java | 40 ------ .../processors/hadoop/HadoopTaskContext.java | 6 +- .../processors/hadoop/HadoopTaskInfo.java | 1 + .../hadoop/counter/HadoopCounterWriter.java | 4 +- .../resources/META-INF/classnames.properties | 4 +- .../fs/IgniteHadoopFileSystemCounterWriter.java | 6 +- .../IgniteHadoopWeightedMapReducePlanner.java | 10 +- .../planner/HadoopAbstractMapReducePlanner.java | 118 ++++++++++++++++ .../planner/HadoopTestRoundRobinMrPlanner.java | 75 ++++++++++ .../processors/hadoop/HadoopCommonUtils.java | 1 + .../processors/hadoop/HadoopContext.java | 2 + .../processors/hadoop/HadoopExternalSplit.java | 1 + .../processors/hadoop/HadoopSplitWrapper.java | 1 + .../HadoopFileSystemCounterWriterDelegate.java | 4 +- ...doopFileSystemCounterWriterDelegateImpl.java | 4 +- .../hadoop/impl/v1/HadoopV1MapTask.java | 6 +- .../hadoop/impl/v1/HadoopV1ReduceTask.java | 4 +- .../hadoop/impl/v1/HadoopV1Splitter.java | 2 +- .../hadoop/impl/v2/HadoopV2Context.java | 2 +- .../processors/hadoop/impl/v2/HadoopV2Job.java | 15 +- .../hadoop/impl/v2/HadoopV2Splitter.java | 2 +- .../hadoop/impl/v2/HadoopV2TaskContext.java | 6 +- .../hadoop/jobtracker/HadoopJobMetadata.java | 4 +- .../hadoop/jobtracker/HadoopJobTracker.java | 32 ++--- .../planner/HadoopAbstractMapReducePlanner.java | 116 --------------- .../planner/HadoopDefaultMapReducePlan.java | 4 +- .../hadoop/shuffle/HadoopShuffle.java | 4 +- .../hadoop/shuffle/HadoopShuffleJob.java | 7 +- .../HadoopEmbeddedTaskExecutor.java | 8 +- .../hadoop/taskexecutor/HadoopRunnableTask.java | 6 +- .../taskexecutor/HadoopTaskExecutorAdapter.java | 8 +- .../external/HadoopExternalTaskExecutor.java | 16 +-- .../child/HadoopChildProcessRunner.java | 4 +- .../resources/META-INF/classnames.properties | 4 +- .../hadoop/impl/HadoopCommandLineTest.java | 4 +- .../hadoop/impl/HadoopJobTrackerSelfTest.java | 1 + .../hadoop/impl/HadoopPlannerMockJob.java | 10 +- .../hadoop/impl/HadoopTasksAllVersionsTest.java | 16 +-- .../hadoop/impl/HadoopTasksV1Test.java | 4 +- .../hadoop/impl/HadoopTasksV2Test.java | 4 +- .../impl/HadoopTestRoundRobinMrPlanner.java | 75 ---------- .../hadoop/impl/HadoopTestTaskContext.java | 6 +- .../hadoop/impl/HadoopV2JobSelfTest.java | 6 +- .../HadoopWeightedMapReducePlannerTest.java | 6 +- .../collections/HadoopAbstractMapTest.java | 4 +- 57 files changed, 738 insertions(+), 615 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java index 84014d6..6443a67 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java @@ -18,7 +18,7 @@ package org.apache.ignite.configuration; import org.apache.ignite.lifecycle.LifecycleBean; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; +import org.apache.ignite.hadoop.HadoopMapReducePlanner; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java new file mode 100644 index 0000000..4138e64 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import java.io.Externalizable; + +/** + * Abstract fragment of an input data source. + */ +public abstract class HadoopInputSplit implements Externalizable { + /** */ + protected String[] hosts; + + /** + * Array of hosts where this input split resides. + * + * @return Hosts. + */ + public String[] hosts() { + assert hosts != null; + + return hosts; + } + + /** + * This method must be implemented for purpose of internal implementation. + * + * @param obj Another object. + * @return {@code true} If objects are equal. + */ + @Override public abstract boolean equals(Object obj); + + /** + * This method must be implemented for purpose of internal implementation. + * + * @return Hash code of the object. + */ + @Override public abstract int hashCode(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java new file mode 100644 index 0000000..8ee0330 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.hadoop; + +import java.util.Collection; +import org.jetbrains.annotations.Nullable; + +/** + * Compact job description. + */ +public interface HadoopJob { + /** + * Gets collection of input splits for this job. + * + * @return Input splits. + */ + public Collection<HadoopInputSplit> input(); + + /** + * Gets optional configuration property for the job. + * + * @param name Property name. + * @return Value or {@code null} if none. + */ + @Nullable String property(String name); + + /** + * Checks whether job has combiner. + * + * @return {@code true} If job has combiner. + */ + boolean hasCombiner(); + + /** + * Checks whether job has reducer. + * Actual number of reducers will be in {@link HadoopMapReducePlan#reducers()}. + * + * @return Number of reducer. + */ + boolean hasReducer(); + + /** + * @return Number of reducers configured for job. + */ + int reducers(); + + /** + * Gets job name. + * + * @return Job name. + */ + String jobName(); + + /** + * Gets user name. + * + * @return User name. + */ + String user(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java new file mode 100644 index 0000000..f77fb64 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import java.io.Serializable; +import java.util.Collection; +import java.util.UUID; +import org.jetbrains.annotations.Nullable; + +/** + * Map-reduce job execution plan. + */ +public interface HadoopMapReducePlan extends Serializable { + /** + * Gets collection of file blocks for which mappers should be executed. + * + * @param nodeId Node ID to check. + * @return Collection of file blocks or {@code null} if no mappers should be executed on given node. + */ + @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId); + + /** + * Gets reducer IDs that should be started on given node. + * + * @param nodeId Node ID to check. + * @return Array of reducer IDs. + */ + @Nullable public int[] reducers(UUID nodeId); + + /** + * Gets collection of all node IDs involved in map part of job execution. + * + * @return Collection of node IDs. + */ + public Collection<UUID> mapperNodeIds(); + + /** + * Gets collection of all node IDs involved in reduce part of job execution. + * + * @return Collection of node IDs. + */ + public Collection<UUID> reducerNodeIds(); + + /** + * Gets overall number of mappers for the job. + * + * @return Number of mappers. + */ + public int mappers(); + + /** + * Gets overall number of reducers for the job. + * + * @return Number of reducers. + */ + public int reducers(); + + /** + * Gets node ID for reducer. + * + * @param reducer Reducer. + * @return Node ID. + */ + public UUID nodeForReducer(int reducer); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java new file mode 100644 index 0000000..8d77b70 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import java.util.Collection; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.jetbrains.annotations.Nullable; + +/** + * Map-reduce execution planner. + */ +public interface HadoopMapReducePlanner { + /** + * Prepares map-reduce execution plan for the given job and topology. + * + * @param job Job. + * @param top Topology. + * @param oldPlan Old plan in case of partial failure. + * @return Map reduce plan. + * @throws IgniteCheckedException If an error occurs. + */ + public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index ae17ac8..ab38e4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -82,12 +82,12 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, + @Override public HadoopJobEx createJob(Class<? extends HadoopJobEx> jobCls, HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException { assert jobCls != null; try { - Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class, + Constructor<? extends HadoopJobEx> constructor = jobCls.getConstructor(HadoopJobId.class, HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class, HadoopHelper.class); return constructor.newInstance(jobId, this, log, libNames, helper); http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java index bc665eb..351abce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java @@ -22,6 +22,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.net.URI; import java.util.Arrays; +import org.apache.ignite.hadoop.HadoopInputSplit; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java deleted file mode 100644 index 998cb61..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.Externalizable; - -/** - * Abstract fragment of an input data source. - */ -public abstract class HadoopInputSplit implements Externalizable { - /** */ - protected String[] hosts; - - /** - * Array of hosts where this input split resides. - * - * @return Hosts. - */ - public String[] hosts() { - assert hosts != null; - - return hosts; - } - - /** - * This method must be implemented for purpose of internal implementation. - * - * @param obj Another object. - * @return {@code true} If objects are equal. - */ - @Override public abstract boolean equals(Object obj); - - /** - * This method must be implemented for purpose of internal implementation. - * - * @return Hash code of the object. - */ - @Override public abstract int hashCode(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java deleted file mode 100644 index a77c744..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.util.Collection; -import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; - -/** - * Hadoop job. - */ -public interface HadoopJob { - /** - * Gets job ID. - * - * @return Job ID. - */ - public HadoopJobId id(); - - /** - * Gets job information. - * - * @return Job information. - */ - public HadoopJobInfo info(); - - /** - * Gets collection of input splits for this job. - * - * @return Input splits. - */ - public Collection<HadoopInputSplit> input() throws IgniteCheckedException; - - /** - * Returns context for task execution. - * - * @param info Task info. - * @return Task Context. - * @throws IgniteCheckedException If failed. - */ - public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException; - - /** - * Does all the needed initialization for the job. Will be called on each node where tasks for this job must - * be executed. - * <p> - * If job is running in external mode this method will be called on instance in Ignite node with parameter - * {@code false} and on instance in external process with parameter {@code true}. - * - * @param external If {@code true} then this job instance resides in external process. - * @param locNodeId Local node ID. - * @throws IgniteCheckedException If failed. - */ - public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException; - - /** - * Release all the resources. - * <p> - * If job is running in external mode this method will be called on instance in Ignite node with parameter - * {@code false} and on instance in external process with parameter {@code true}. - * - * @param external If {@code true} then this job instance resides in external process. - * @throws IgniteCheckedException If failed. - */ - public void dispose(boolean external) throws IgniteCheckedException; - - /** - * Prepare local environment for the task. - * - * @param info Task info. - * @throws IgniteCheckedException If failed. - */ - public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException; - - /** - * Cleans up local environment of the task. - * - * @param info Task info. - * @throws IgniteCheckedException If failed. - */ - public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException; - - /** - * Cleans up the job staging directory. - */ - public void cleanupStagingDirectory(); - - /** - * @return Ignite work directory. - */ - public String igniteWorkDirectory(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java new file mode 100644 index 0000000..ba78af9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.hadoop.HadoopInputSplit; +import org.apache.ignite.hadoop.HadoopJob; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop job. + */ +public abstract class HadoopJobEx implements HadoopJob { + /** + * Gets job ID. + * + * @return Job ID. + */ + abstract public HadoopJobId id(); + + /** + * Gets job information. + * + * @return Job information. + */ + abstract public HadoopJobInfo info(); + + /** + * Gets collection of input splits for this job. + * + * @return Input splits. + */ + abstract public Collection<HadoopInputSplit> input(); + + /** + * Returns context for task execution. + * + * @param info Task info. + * @return Task Context. + * @throws IgniteCheckedException If failed. + */ + abstract public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Does all the needed initialization for the job. Will be called on each node where tasks for this job must + * be executed. + * <p> + * If job is running in external mode this method will be called on instance in Ignite node with parameter + * {@code false} and on instance in external process with parameter {@code true}. + * + * @param external If {@code true} then this job instance resides in external process. + * @param locNodeId Local node ID. + * @throws IgniteCheckedException If failed. + */ + abstract public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException; + + /** + * Release all the resources. + * <p> + * If job is running in external mode this method will be called on instance in Ignite node with parameter + * {@code false} and on instance in external process with parameter {@code true}. + * + * @param external If {@code true} then this job instance resides in external process. + * @throws IgniteCheckedException If failed. + */ + abstract public void dispose(boolean external) throws IgniteCheckedException; + + /** + * Prepare local environment for the task. + * + * @param info Task info. + * @throws IgniteCheckedException If failed. + */ + abstract public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Cleans up local environment of the task. + * + * @param info Task info. + * @throws IgniteCheckedException If failed. + */ + abstract public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Cleans up the job staging directory. + */ + abstract public void cleanupStagingDirectory(); + + /** + * @return Ignite work directory. + */ + abstract public String igniteWorkDirectory(); + + /** {@inheritDoc} */ + @Nullable @Override public String property(String name) { + return info().property(name); + } + + /** {@inheritDoc} */ + @Override public boolean hasCombiner() { + return info().hasCombiner(); + } + + /** {@inheritDoc} */ + @Override public boolean hasReducer() { + return info().hasReducer(); + } + + /** {@inheritDoc} */ + @Override public int reducers() { + return info().reducers(); + } + + /** {@inheritDoc} */ + @Override public String jobName() { + return info().jobName(); + } + + /** {@inheritDoc} */ + @Override public String user() { + return info().user(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java index 853c63d..4cc8f80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java @@ -17,29 +17,29 @@ package org.apache.ignite.internal.processors.hadoop; -import java.io.Serializable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.hadoop.HadoopMapReducePlan; import org.jetbrains.annotations.Nullable; /** - * Compact job description. + * Extended job description. */ -public interface HadoopJobInfo extends Serializable { +public interface HadoopJobInfo { /** * Gets optional configuration property for the job. * * @param name Property name. * @return Value or {@code null} if none. */ - @Nullable public String property(String name); + @Nullable String property(String name); /** * Checks whether job has combiner. * * @return {@code true} If job has combiner. */ - public boolean hasCombiner(); + boolean hasCombiner(); /** * Checks whether job has reducer. @@ -47,42 +47,42 @@ public interface HadoopJobInfo extends Serializable { * * @return Number of reducer. */ - public boolean hasReducer(); - - /** - * Creates new job instance for the given ID. - * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJob} is for one job execution. - * This method will be called once for the same ID on one node, though it can be called on the same host - * multiple times from different processes (in case of multiple nodes on the same host or external execution). - * - * @param jobCls The job class. - * @param jobId Job ID. - * @param log Logger. - * @param libNames Optional additional native library names. - * @param helper HadoopHelper. - * @return Job. - * @throws IgniteCheckedException If failed. - */ - public HadoopJob createJob(Class<? extends HadoopJob> jobCls, - HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames, HadoopHelper helper) - throws IgniteCheckedException; + boolean hasReducer(); /** * @return Number of reducers configured for job. */ - public int reducers(); + int reducers(); /** * Gets job name. * * @return Job name. */ - public String jobName(); + String jobName(); /** * Gets user name. * * @return User name. */ - public String user(); + String user(); + + /** + * Creates new job instance for the given ID. + * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJobEx} is for one job execution. + * This method will be called once for the same ID on one node, though it can be called on the same host + * multiple times from different processes (in case of multiple nodes on the same host or external execution). + * + * @param jobCls The job class. + * @param jobId Job ID. + * @param log Logger. + * @param libNames Optional additional native library names. + * @param helper HadoopHelper. + * @return Job. + * @throws IgniteCheckedException If failed. + */ + public HadoopJobEx createJob(Class<? extends HadoopJobEx> jobCls, + HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames, HadoopHelper helper) + throws IgniteCheckedException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java deleted file mode 100644 index aadc2bf..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.Serializable; -import java.util.Collection; -import java.util.UUID; -import org.jetbrains.annotations.Nullable; - -/** - * Map-reduce job execution plan. - */ -public interface HadoopMapReducePlan extends Serializable { - /** - * Gets collection of file blocks for which mappers should be executed. - * - * @param nodeId Node ID to check. - * @return Collection of file blocks or {@code null} if no mappers should be executed on given node. - */ - @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId); - - /** - * Gets reducer IDs that should be started on given node. - * - * @param nodeId Node ID to check. - * @return Array of reducer IDs. - */ - @Nullable public int[] reducers(UUID nodeId); - - /** - * Gets collection of all node IDs involved in map part of job execution. - * - * @return Collection of node IDs. - */ - public Collection<UUID> mapperNodeIds(); - - /** - * Gets collection of all node IDs involved in reduce part of job execution. - * - * @return Collection of node IDs. - */ - public Collection<UUID> reducerNodeIds(); - - /** - * Gets overall number of mappers for the job. - * - * @return Number of mappers. - */ - public int mappers(); - - /** - * Gets overall number of reducers for the job. - * - * @return Number of reducers. - */ - public int reducers(); - - /** - * Gets node ID for reducer. - * - * @param reducer Reducer. - * @return Node ID. - */ - public UUID nodeForReducer(int reducer); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java deleted file mode 100644 index 0009c4a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.util.Collection; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cluster.ClusterNode; -import org.jetbrains.annotations.Nullable; - -/** - * Map-reduce execution planner. - */ -public interface HadoopMapReducePlanner { - /** - * Prepares map-reduce execution plan for the given job and topology. - * - * @param job Job. - * @param top Topology. - * @param oldPlan Old plan in case of partial failure. - * @return Map reduce plan. - * @throws IgniteCheckedException If an error occurs. - */ - public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, - @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java index dddd017..194c1dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawCompar */ public abstract class HadoopTaskContext { /** */ - protected final HadoopJob job; + protected final HadoopJobEx job; /** */ private HadoopTaskInput input; @@ -44,7 +44,7 @@ public abstract class HadoopTaskContext { * @param taskInfo Task info. * @param job Job. */ - protected HadoopTaskContext(HadoopTaskInfo taskInfo, HadoopJob job) { + protected HadoopTaskContext(HadoopTaskInfo taskInfo, HadoopJobEx job) { this.taskInfo = taskInfo; this.job = job; } @@ -88,7 +88,7 @@ public abstract class HadoopTaskContext { /** * @return Job. */ - public HadoopJob job() { + public HadoopJobEx job() { return job; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java index 3509367..eb3113c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import org.apache.ignite.hadoop.HadoopInputSplit; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java index 6c033b2..93a69db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.hadoop.counter; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx; /** * The object that writes some system counters to some storage for each running job. This operation is a part of @@ -32,5 +32,5 @@ public interface HadoopCounterWriter { * @param cntrs Counters. * @throws IgniteCheckedException If failed. */ - public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException; + public void write(HadoopJobEx job, HadoopCounters cntrs) throws IgniteCheckedException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 212e94a..02bad40 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1110,13 +1110,13 @@ org.apache.ignite.internal.processors.dr.GridDrType org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo org.apache.ignite.internal.processors.hadoop.HadoopFileBlock -org.apache.ignite.internal.processors.hadoop.HadoopInputSplit +org.apache.ignite.hadoop.HadoopInputSplit org.apache.ignite.internal.processors.hadoop.HadoopJobId org.apache.ignite.internal.processors.hadoop.HadoopJobInfo org.apache.ignite.internal.processors.hadoop.HadoopJobPhase org.apache.ignite.internal.processors.hadoop.HadoopJobProperty org.apache.ignite.internal.processors.hadoop.HadoopJobStatus -org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan +org.apache.ignite.hadoop.HadoopMapReducePlan org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo org.apache.ignite.internal.processors.hadoop.HadoopTaskType org.apache.ignite.internal.processors.hadoop.message.HadoopMessage http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index f1c1b16..1128fa4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -18,7 +18,7 @@ package org.apache.ignite.hadoop.fs; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils; @@ -41,7 +41,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter private volatile HadoopFileSystemCounterWriterDelegate delegate; /** {@inheritDoc} */ - @Override public void write(HadoopJob job, HadoopCounters cntrs) + @Override public void write(HadoopJobEx job, HadoopCounters cntrs) throws IgniteCheckedException { delegate(job).write(job, cntrs); } @@ -52,7 +52,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter * @param job Job. * @return Delegate. */ - private HadoopFileSystemCounterWriterDelegate delegate(HadoopJob job) { + private HadoopFileSystemCounterWriterDelegate delegate(HadoopJobEx job) { HadoopFileSystemCounterWriterDelegate delegate0 = delegate; if (delegate0 == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java index 178cdb5..bb0b47f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java @@ -21,16 +21,16 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.hadoop.HadoopJob; import org.apache.ignite.igfs.IgfsBlockLocation; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; +import org.apache.ignite.hadoop.HadoopInputSplit; +import org.apache.ignite.hadoop.HadoopMapReducePlan; import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint; -import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner; +import org.apache.ignite.hadoop.planner.HadoopAbstractMapReducePlanner; import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup; import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology; @@ -117,7 +117,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes, @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { List<HadoopInputSplit> splits = HadoopCommonUtils.sortInputSplits(job.input()); - int reducerCnt = job.info().reducers(); + int reducerCnt = job.reducers(); if (reducerCnt < 0) throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + reducerCnt); http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java new file mode 100644 index 0000000..dd01f11 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.planner; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.hadoop.HadoopMapReducePlanner; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; + +/** + * Base class for map-reduce planners. + */ +public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner { + /** Injected grid. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** Logger. */ + @SuppressWarnings("UnusedDeclaration") + @LoggerResource + protected IgniteLogger log; + + /** + * Create plan topology. + * + * @param nodes Topology nodes. + * @return Plan topology. + */ + protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) { + Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size()); + + Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size()); + Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size()); + + for (ClusterNode node : nodes) { + String macs = node.attribute(ATTR_MACS); + + HadoopMapReducePlanGroup grp = macsMap.get(macs); + + if (grp == null) { + grp = new HadoopMapReducePlanGroup(node, macs); + + macsMap.put(macs, grp); + } + else + grp.add(node); + + idToGrp.put(node.id(), grp); + + for (String host : node.addresses()) { + HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host); + + if (hostGrp == null) + hostToGrp.put(host, grp); + else + assert hostGrp == grp; + } + } + + return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp); + } + + + /** + * Groups nodes by host names. + * + * @param top Topology to group. + * @return Map. + */ + protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) { + Map<String, Collection<UUID>> grouped = U.newHashMap(top.size()); + + for (ClusterNode node : top) { + for (String host : node.hostNames()) { + Collection<UUID> nodeIds = grouped.get(host); + + if (nodeIds == null) { + // Expecting 1-2 nodes per host. + nodeIds = new ArrayList<>(2); + + grouped.put(host, nodeIds); + } + + nodeIds.add(node.id()); + } + } + + return grouped; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java new file mode 100644 index 0000000..d9de0c1 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.planner; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.hadoop.HadoopInputSplit; +import org.apache.ignite.hadoop.HadoopJob; +import org.apache.ignite.hadoop.HadoopMapReducePlan; +import org.apache.ignite.hadoop.HadoopMapReducePlanner; +import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan; +import org.jetbrains.annotations.Nullable; + +/** + * Round-robin mr planner. + */ +public class HadoopTestRoundRobinMrPlanner implements HadoopMapReducePlanner { + /** {@inheritDoc} */ + @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { + if (top.isEmpty()) + throw new IllegalArgumentException("Topology is empty"); + + // Has at least one element. + Iterator<ClusterNode> it = top.iterator(); + + Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); + + for (HadoopInputSplit block : job.input()) { + ClusterNode node = it.next(); + + Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id()); + + if (nodeBlocks == null) { + nodeBlocks = new ArrayList<>(); + + mappers.put(node.id(), nodeBlocks); + } + + nodeBlocks.add(block); + + if (!it.hasNext()) + it = top.iterator(); + } + + int[] rdc = new int[job.reducers()]; + + for (int i = 0; i < rdc.length; i++) + rdc[i] = i; + + return new HadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java index 37af147..7e74d82 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.hadoop; +import org.apache.ignite.hadoop.HadoopInputSplit; import org.jetbrains.annotations.Nullable; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java index 4326ad2..f125485 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.HadoopConfiguration; +import org.apache.ignite.hadoop.HadoopMapReducePlan; +import org.apache.ignite.hadoop.HadoopMapReducePlanner; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata; import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java index a9b4532..7db535a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.hadoop; +import org.apache.ignite.hadoop.HadoopInputSplit; import org.apache.ignite.internal.util.typedef.internal.S; import java.io.Externalizable; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java index fb6d0f3..fe5d434 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java @@ -22,6 +22,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Arrays; +import org.apache.ignite.hadoop.HadoopInputSplit; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java index 541cf80..0d82b5b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.hadoop.delegate; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; /** @@ -32,5 +32,5 @@ public interface HadoopFileSystemCounterWriterDelegate { * @param cntrs Counters. * @throws IgniteCheckedException If failed. */ - public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException; + public void write(HadoopJobEx job, HadoopCounters cntrs) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java index d4c10da..6b36d26 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java @@ -24,7 +24,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter; import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; @@ -60,7 +60,7 @@ public class HadoopFileSystemCounterWriterDelegateImpl implements HadoopFileSyst } /** {@inheritDoc} */ - public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException { + public void write(HadoopJobEx job, HadoopCounters cntrs) throws IgniteCheckedException { Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); final HadoopJobInfo jobInfo = job.info(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java index 65ff280..cde6da6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java @@ -28,8 +28,8 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx; import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; @@ -54,7 +54,7 @@ public class HadoopV1MapTask extends HadoopV1Task { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { - HadoopJob job = taskCtx.job(); + HadoopJobEx job = taskCtx.job(); HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java index 92c024e..6b90653 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java @@ -22,7 +22,7 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx; import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; @@ -51,7 +51,7 @@ public class HadoopV1ReduceTask extends HadoopV1Task { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { - HadoopJob job = taskCtx.job(); + HadoopJobEx job = taskCtx.job(); HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java index 11a3598..26325b9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java @@ -24,7 +24,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java index eec0636..11f2ecc 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java @@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput; import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java index a24e581..5d3f22d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.split.JobSplit; import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; @@ -37,8 +38,8 @@ import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; import org.apache.ignite.internal.processors.hadoop.HadoopHelper; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; @@ -85,7 +86,7 @@ import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSys /** * Hadoop job implementation for v2 API. */ -public class HadoopV2Job implements HadoopJob { +public class HadoopV2Job extends HadoopJobEx { /** */ private final JobConf jobConf; @@ -139,6 +140,7 @@ public class HadoopV2Job implements HadoopJob { * @param jobInfo Job info. * @param log Logger. * @param libNames Optional additional native library names. + * @param helper Hadoop helper. */ public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log, @Nullable String[] libNames, HadoopHelper helper) { @@ -182,7 +184,7 @@ public class HadoopV2Job implements HadoopJob { } /** {@inheritDoc} */ - @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException { + @Override public Collection<HadoopInputSplit> input() { ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf.getClassLoader()); try { @@ -239,6 +241,9 @@ public class HadoopV2Job implements HadoopJob { throw transformException(e); } } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } finally { HadoopCommonUtils.restoreContextClassLoader(oldLdr); } @@ -274,7 +279,7 @@ public class HadoopV2Job implements HadoopJob { fullCtxClsQueue.add(cls); } - Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJob.class, + Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJobEx.class, HadoopJobId.class, UUID.class, DataInput.class); if (jobConfData == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java index 667ef1e..c878515 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java @@ -25,7 +25,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; import org.jetbrains.annotations.Nullable; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java index d328550..8b8a728 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java @@ -45,8 +45,8 @@ import org.apache.ignite.hadoop.io.TextPartiallyRawComparator; import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; @@ -165,7 +165,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { * @param locNodeId Local node ID. * @param jobConfDataInput DataInput for read JobConf. */ - public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId, + public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJobEx job, HadoopJobId jobId, @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException { super(taskInfo, job); this.locNodeId = locNodeId; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java index 090b336..1035701 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java @@ -24,11 +24,11 @@ import java.io.ObjectOutput; import java.util.Collection; import java.util.Map; import java.util.UUID; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; +import org.apache.ignite.hadoop.HadoopMapReducePlan; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor; http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 9542372..9284c02 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -49,14 +49,14 @@ import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; import org.apache.ignite.internal.processors.hadoop.HadoopComponent; import org.apache.ignite.internal.processors.hadoop.HadoopContext; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; +import org.apache.ignite.hadoop.HadoopMapReducePlan; +import org.apache.ignite.hadoop.HadoopMapReducePlanner; import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter; @@ -114,7 +114,7 @@ public class HadoopJobTracker extends HadoopComponent { private HadoopMapReducePlanner mrPlanner; /** All the known jobs. */ - private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJob>> jobs = new ConcurrentHashMap8<>(); + private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJobEx>> jobs = new ConcurrentHashMap8<>(); /** Locally active jobs. */ private final ConcurrentMap<HadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>(); @@ -129,8 +129,8 @@ public class HadoopJobTracker extends HadoopComponent { /** Component busy lock. */ private GridSpinReadWriteLock busyLock; - /** Class to create HadoopJob instances from. */ - private Class<? extends HadoopJob> jobCls; + /** Class to create HadoopJobEx instances from. */ + private Class<? extends HadoopJobEx> jobCls; /** Closure to check result of async transform of system cache. */ private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() { @@ -158,7 +158,7 @@ public class HadoopJobTracker extends HadoopComponent { HadoopClassLoader ldr = ctx.kernalContext().hadoopHelper().commonClassLoader(); try { - jobCls = (Class<HadoopJob>)ldr.loadClass(HadoopCommonUtils.JOB_CLS_NAME); + jobCls = (Class<HadoopJobEx>)ldr.loadClass(HadoopCommonUtils.JOB_CLS_NAME); } catch (Exception ioe) { throw new IgniteCheckedException("Failed to load job class [class=" + @@ -310,7 +310,7 @@ public class HadoopJobTracker extends HadoopComponent { if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId)) throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); - HadoopJob job = job(jobId, info); + HadoopJobEx job = job(jobId, info); HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null); @@ -692,7 +692,7 @@ public class HadoopJobTracker extends HadoopComponent { try { if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) { // Failover setup task. - HadoopJob job = job(jobId, meta.jobInfo()); + HadoopJobEx job = job(jobId, meta.jobInfo()); Collection<HadoopTaskInfo> setupTask = setupTask(jobId); @@ -818,7 +818,7 @@ public class HadoopJobTracker extends HadoopComponent { throws IgniteCheckedException { JobLocalState state = activeJobs.get(jobId); - HadoopJob job = job(jobId, meta.jobInfo()); + HadoopJobEx job = job(jobId, meta.jobInfo()); HadoopMapReducePlan plan = meta.mapReducePlan(); @@ -1048,7 +1048,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param job Job instance. * @return Collection of task infos. */ - private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) { + private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJobEx job) { UUID locNodeId = ctx.localNodeId(); HadoopJobId jobId = job.id(); @@ -1097,15 +1097,15 @@ public class HadoopJobTracker extends HadoopComponent { * @return Job. * @throws IgniteCheckedException If failed. */ - @Nullable public HadoopJob job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException { - GridFutureAdapter<HadoopJob> fut = jobs.get(jobId); + @Nullable public HadoopJobEx job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException { + GridFutureAdapter<HadoopJobEx> fut = jobs.get(jobId); - if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter<HadoopJob>())) != null) + if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter<HadoopJobEx>())) != null) return fut.get(); fut = jobs.get(jobId); - HadoopJob job = null; + HadoopJobEx job = null; try { if (jobInfo == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java deleted file mode 100644 index f01f72b..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.planner; - -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; - -/** - * Base class for map-reduce planners. - */ -public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner { - /** Injected grid. */ - @IgniteInstanceResource - protected Ignite ignite; - - /** Logger. */ - @SuppressWarnings("UnusedDeclaration") - @LoggerResource - protected IgniteLogger log; - - /** - * Create plan topology. - * - * @param nodes Topology nodes. - * @return Plan topology. - */ - protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) { - Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size()); - - Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size()); - Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size()); - - for (ClusterNode node : nodes) { - String macs = node.attribute(ATTR_MACS); - - HadoopMapReducePlanGroup grp = macsMap.get(macs); - - if (grp == null) { - grp = new HadoopMapReducePlanGroup(node, macs); - - macsMap.put(macs, grp); - } - else - grp.add(node); - - idToGrp.put(node.id(), grp); - - for (String host : node.addresses()) { - HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host); - - if (hostGrp == null) - hostToGrp.put(host, grp); - else - assert hostGrp == grp; - } - } - - return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp); - } - - - /** - * Groups nodes by host names. - * - * @param top Topology to group. - * @return Map. - */ - protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) { - Map<String, Collection<UUID>> grouped = U.newHashMap(top.size()); - - for (ClusterNode node : top) { - for (String host : node.hostNames()) { - Collection<UUID> nodeIds = grouped.get(host); - - if (nodeIds == null) { - // Expecting 1-2 nodes per host. - nodeIds = new ArrayList<>(2); - - grouped.put(host, nodeIds); - } - - nodeIds.add(node.id()); - } - } - - return grouped; - } -}