http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ProgressRetriever.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ProgressRetriever.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ProgressRetriever.java deleted file mode 100644 index 6be8147..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ProgressRetriever.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.hive.resources.jobs.atsJobs.TezVertexId; -import org.apache.ambari.view.hive.resources.jobs.rm.RMParser; -import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive.utils.ServiceFormattedException; -import org.apache.ambari.view.hive.utils.SharedObjectsFactory; - -import java.util.List; -import java.util.Map; - -public class ProgressRetriever { - private final Progress progress; - private final Job job; - private final SharedObjectsFactory sharedObjects; - - public ProgressRetriever(Job job, SharedObjectsFactory sharedObjects) { - this.job = job; - this.sharedObjects = sharedObjects; - - this.progress = new Progress(); - } - - public Progress getProgress() { - jobCheck(); - - progress.dagProgress = sharedObjects.getRMParser().getDAGProgress( - job.getApplicationId(), job.getDagId()); - - List<TezVertexId> vertices = sharedObjects.getATSParser().getVerticesForDAGId(job.getDagId()); - progress.vertexProgresses = sharedObjects.getRMParser().getDAGVerticesProgress(job.getApplicationId(), job.getDagId(), vertices); - - return progress; - } - - public void jobCheck() { - if (job.getApplicationId() == null || job.getApplicationId().isEmpty()) { - throw new ServiceFormattedException("E070 ApplicationId is not defined yet"); - } - if (job.getDagId() == null || job.getDagId().isEmpty()) { - throw new ServiceFormattedException("E080 DagID is not defined yet"); - } - } - - public static class Progress { - public Double dagProgress; - public List<RMParser.VertexProgress> vertexProgresses; - } -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ResultsPaginationController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ResultsPaginationController.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ResultsPaginationController.java deleted file mode 100644 index cc2ff42..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/ResultsPaginationController.java +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - - -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.client.ColumnDescription; -import org.apache.ambari.view.hive.client.HiveClientException; -import org.apache.ambari.view.hive.client.Cursor; -import org.apache.ambari.view.hive.utils.HiveClientFormattedException; -import org.apache.ambari.view.hive.utils.ServiceFormattedException; -import org.apache.commons.collections4.map.PassiveExpiringMap; - -import javax.ws.rs.core.Response; -import java.lang.Object; -import java.lang.String; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.List; -import java.util.concurrent.Callable; - -/** - * Results Pagination Controller - * Persists cursors for result sets - */ -public class ResultsPaginationController { - public static final String DEFAULT_SEARCH_ID = "default"; - private static Map<String, ResultsPaginationController> viewSingletonObjects = new HashMap<String, ResultsPaginationController>(); - public static ResultsPaginationController getInstance(ViewContext context) { - if (!viewSingletonObjects.containsKey(context.getInstanceName())) - viewSingletonObjects.put(context.getInstanceName(), new ResultsPaginationController()); - return viewSingletonObjects.get(context.getInstanceName()); - } - - public ResultsPaginationController() { - } - - private static final long EXPIRING_TIME = 10*60*1000; // 10 minutes - private static final int DEFAULT_FETCH_COUNT = 50; - private Map<String, Cursor> resultsCache; - - public static class CustomTimeToLiveExpirationPolicy extends PassiveExpiringMap.ConstantTimeToLiveExpirationPolicy<String, Cursor> { - public CustomTimeToLiveExpirationPolicy(long timeToLiveMillis) { - super(timeToLiveMillis); - } - - @Override - public long expirationTime(String key, Cursor value) { - if (key.startsWith("$")) { - return -1; //never expire - } - return super.expirationTime(key, value); - } - } - - private Map<String, Cursor> getResultsCache() { - if (resultsCache == null) { - PassiveExpiringMap<String, Cursor> resultsCacheExpiringMap = - new PassiveExpiringMap<String, Cursor>(new CustomTimeToLiveExpirationPolicy(EXPIRING_TIME)); - resultsCache = Collections.synchronizedMap(resultsCacheExpiringMap); - } - return resultsCache; - } - - /** - * Renew timer of cache entry. - * @param key name/id of results request - * @return false if entry not found; true if renew was ok - */ - public boolean keepAlive(String key, String searchId) { - if (searchId == null) - searchId = DEFAULT_SEARCH_ID; - String effectiveKey = key + "?" + searchId; - if (!getResultsCache().containsKey(effectiveKey)) { - return false; - } - Cursor cursor = getResultsCache().get(effectiveKey); - getResultsCache().put(effectiveKey, cursor); - return true; - } - - private Cursor getResultsSet(String key, Callable<Cursor> makeResultsSet) { - if (!getResultsCache().containsKey(key)) { - Cursor resultSet = null; - try { - resultSet = makeResultsSet.call(); - } catch (HiveClientException ex) { - throw new HiveClientFormattedException(ex); - } catch (Exception ex) { - throw new ServiceFormattedException(ex.getMessage(), ex); - } - getResultsCache().put(key, resultSet); - } - - return getResultsCache().get(key); - } - - public Response.ResponseBuilder request(String key, String searchId, boolean canExpire, String fromBeginning, Integer count, String format, Callable<Cursor> makeResultsSet) throws HiveClientException { - if (searchId == null) - searchId = DEFAULT_SEARCH_ID; - key = key + "?" + searchId; - if (!canExpire) - key = "$" + key; - if (fromBeginning != null && fromBeginning.equals("true") && getResultsCache().containsKey(key)) - getResultsCache().remove(key); - Cursor resultSet = getResultsSet(key, makeResultsSet); - - if (count == null) - count = DEFAULT_FETCH_COUNT; - - ArrayList<ColumnDescription> schema = resultSet.getSchema(); - ArrayList<Object[]> rows = new ArrayList<Object[]>(count); - int read = resultSet.readRaw(rows, count); - if(format != null && format.equalsIgnoreCase("d3")) { - List<Map<String,Object>> results = new ArrayList<Map<String,Object>>(); - for(int i=0; i<rows.size(); i++) { - Object[] row = rows.get(i); - Map<String, Object> keyValue = new HashMap<String, Object>(row.length); - for(int j=0; j<row.length; j++) { - //Replace dots in schema with underscore - String schemaName = schema.get(j).getName(); - keyValue.put(schemaName.replace('.','_'), row[j]); - } - results.add(keyValue); - } - return Response.ok(results); - } else { - ResultsResponse resultsResponse = new ResultsResponse(); - resultsResponse.setSchema(schema); - resultsResponse.setRows(rows); - resultsResponse.setReadCount(read); - resultsResponse.setHasNext(resultSet.hasNext()); - // resultsResponse.setSize(resultSet.size()); - resultsResponse.setOffset(resultSet.getOffset()); - resultsResponse.setHasResults(true); - return Response.ok(resultsResponse); - } - } - - public static Response.ResponseBuilder emptyResponse() { - ResultsResponse resultsResponse = new ResultsResponse(); - resultsResponse.setSchema(new ArrayList<ColumnDescription>()); - resultsResponse.setRows(new ArrayList<Object[]>()); - resultsResponse.setReadCount(0); - resultsResponse.setHasNext(false); - resultsResponse.setOffset(0); - resultsResponse.setHasResults(false); - return Response.ok(resultsResponse); - } - - private static class ResultsResponse { - private ArrayList<ColumnDescription> schema; - private ArrayList<String[]> rows; - private int readCount; - private boolean hasNext; - private long offset; - private boolean hasResults; - - public void setSchema(ArrayList<ColumnDescription> schema) { - this.schema = schema; - } - - public ArrayList<ColumnDescription> getSchema() { - return schema; - } - - public void setRows(ArrayList<Object[]> rows) { - if( null == rows ){ - this.rows = null; - } - this.rows = new ArrayList<String[]>(rows.size()); - for(Object[] row : rows ){ - String[] strs = new String[row.length]; - for( int colNum = 0 ; colNum < row.length ; colNum++ ){ - String value = String.valueOf(row[colNum]); - if(row[colNum] != null && (value.isEmpty() || value.equalsIgnoreCase("null"))){ - strs[colNum] = String.format("\"%s\"",value); - }else{ - strs[colNum] = value; - } - } - this.rows.add(strs); - } - } - - public ArrayList<String[]> getRows() { - return rows; - } - - public void setReadCount(int readCount) { - this.readCount = readCount; - } - - public int getReadCount() { - return readCount; - } - - public void setHasNext(boolean hasNext) { - this.hasNext = hasNext; - } - - public boolean isHasNext() { - return hasNext; - } - - public long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public boolean getHasResults() { - return hasResults; - } - - public void setHasResults(boolean hasResults) { - this.hasResults = hasResults; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java deleted file mode 100644 index a1d87ad..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs; - -import org.apache.ambari.view.hive.persistence.utils.Indexed; -import org.apache.ambari.view.hive.utils.ServiceFormattedException; -import org.apache.commons.beanutils.PropertyUtils; -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; -import org.apache.hive.service.cli.thrift.THandleIdentifier; -import org.apache.hive.service.cli.thrift.TOperationHandle; -import org.apache.hive.service.cli.thrift.TOperationType; - -import java.lang.reflect.InvocationTargetException; -import java.util.Map; - -/** - * Bean to represent TOperationHandle stored in DB - */ -public class StoredOperationHandle implements Indexed { - private boolean hasResultSet; - private double modifiedRowCount; - private int operationType; - private String guid; - private String secret; - - private String jobId; - - private String id; - - public StoredOperationHandle() {} - public StoredOperationHandle(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException { - for (Map.Entry<String, Object> entry : stringObjectMap.entrySet()) { - try { - PropertyUtils.setProperty(this, entry.getKey(), entry.getValue()); - } catch (NoSuchMethodException e) { - //do nothing, skip - } - } - } - - - public static StoredOperationHandle buildFromTOperationHandle(TOperationHandle handle) { - StoredOperationHandle storedHandle = new StoredOperationHandle(); - //bool hasResultSet - storedHandle.setHasResultSet(handle.isHasResultSet()); - //optional double modifiedRowCount - storedHandle.setModifiedRowCount(handle.getModifiedRowCount()); - //TOperationType operationType - storedHandle.setOperationType(handle.getOperationType().getValue()); - //THandleIdentifier operationId - storedHandle.setGuid(Hex.encodeHexString(handle.getOperationId().getGuid())); - storedHandle.setSecret(Hex.encodeHexString(handle.getOperationId().getSecret())); - return storedHandle; - } - - public TOperationHandle toTOperationHandle() { - TOperationHandle handle = new TOperationHandle(); - handle.setHasResultSet(isHasResultSet()); - handle.setModifiedRowCount(getModifiedRowCount()); - handle.setOperationType(TOperationType.findByValue(getOperationType())); - THandleIdentifier identifier = new THandleIdentifier(); - try { - identifier.setGuid(Hex.decodeHex(getGuid().toCharArray())); - identifier.setSecret(Hex.decodeHex(getSecret().toCharArray())); - } catch (DecoderException e) { - throw new ServiceFormattedException("E060 Wrong identifier of OperationHandle is stored in DB"); - } - handle.setOperationId(identifier); - return handle; - } - - public boolean isHasResultSet() { - return hasResultSet; - } - - public void setHasResultSet(boolean hasResultSet) { - this.hasResultSet = hasResultSet; - } - - public double getModifiedRowCount() { - return modifiedRowCount; - } - - public void setModifiedRowCount(double modifiedRowCount) { - this.modifiedRowCount = modifiedRowCount; - } - - public int getOperationType() { - return operationType; - } - - public void setOperationType(int operationType) { - this.operationType = operationType; - } - - public String getGuid() { - return guid; - } - - public void setGuid(String guid) { - this.guid = guid; - } - - public String getSecret() { - return secret; - } - - public void setSecret(String secret) { - this.secret = secret; - } - - public String getJobId() { - return jobId; - } - - public void setJobId(String jobId) { - this.jobId = jobId; - } - - @Override - public String getId() { - return id; - } - - @Override - public void setId(String id) { - this.id = id; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java deleted file mode 100644 index b145df2..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.atsJobs; - -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedList; -import java.util.List; - -/** - * Parser of ATS responses - */ -public class ATSParser implements IATSParser { - protected final static Logger LOG = - LoggerFactory.getLogger(ATSParser.class); - - private ATSRequestsDelegate delegate; - - private static final long MillisInSecond = 1000L; - - public ATSParser(ATSRequestsDelegate delegate) { - this.delegate = delegate; - } - - /** - * returns all HiveQueryIDs from ATS for the given user. - * @param username - * @return - */ - @Override - public List<HiveQueryId> getHiveQueryIdsForUser(String username) { - JSONObject entities = delegate.hiveQueryIdsForUser(username); - return parseHqidJsonFromATS(entities); - } - - /** - * parses the JSONArray or hive query IDs - * @param entities: should contain 'entities' element as JSONArray - * @return - */ - private List<HiveQueryId> parseHqidJsonFromATS(JSONObject entities) { - JSONArray jobs = (JSONArray) entities.get("entities"); - - return getHqidListFromJsonArray(jobs); - } - - /** - * parses List of HiveQueryIds from JSON - * @param jobs - * @return - */ - private List<HiveQueryId> getHqidListFromJsonArray(JSONArray jobs) { - List<HiveQueryId> parsedJobs = new LinkedList<>(); - for (Object job : jobs) { - try { - HiveQueryId parsedJob = parseAtsHiveJob((JSONObject) job); - parsedJobs.add(parsedJob); - } catch (Exception ex) { - LOG.error("Error while parsing ATS job", ex); - } - } - - return parsedJobs; - } - - @Override - public List<TezVertexId> getVerticesForDAGId(String dagId) { - JSONObject entities = delegate.tezVerticesListForDAG(dagId); - JSONArray vertices = (JSONArray) entities.get("entities"); - - List<TezVertexId> parsedVertices = new LinkedList<TezVertexId>(); - for(Object vertex : vertices) { - try { - TezVertexId parsedVertex = parseVertex((JSONObject) vertex); - parsedVertices.add(parsedVertex); - } catch (Exception ex) { - LOG.error("Error while parsing the vertex", ex); - } - } - - return parsedVertices; - } - - @Override - public HiveQueryId getHiveQueryIdByOperationId(String guidString) { - JSONObject entities = delegate.hiveQueryIdByOperationId(guidString); - return getHiveQueryIdFromJson(entities); - } - - private HiveQueryId getHiveQueryIdFromJson(JSONObject entities) { - JSONArray jobs = (JSONArray) entities.get("entities"); - - if (jobs.size() == 0) { - return new HiveQueryId(); - } - - return parseAtsHiveJob((JSONObject) jobs.get(0)); - } - - /** - * returns the hive entity from ATS. empty object if not found. - * - * @param hiveId: the entityId of the hive - * @return: empty entity if not found else HiveQueryId - */ - @Override - public HiveQueryId getHiveQueryIdByHiveEntityId(String hiveId) { - JSONObject entity = delegate.hiveQueryEntityByEntityId(hiveId); - return parseAtsHiveJob(entity); - } - - @Override - public TezDagId getTezDAGByName(String name) { - JSONArray tezDagEntities = (JSONArray) delegate.tezDagByName(name).get("entities"); - return parseTezDag(tezDagEntities); - } - - @Override - public TezDagId getTezDAGByEntity(String entity) { - JSONArray tezDagEntities = (JSONArray) delegate.tezDagByEntity(entity).get("entities"); - return parseTezDag(tezDagEntities); - } - - /** - * fetches the HIVE_QUERY_ID from ATS for given user between given time period - * - * @param username: username for which to fetch hive query IDs - * @param startTime: time in miliseconds, inclusive - * @param endTime: time in miliseconds, exclusive - * @return: List of HIVE_QUERY_ID - */ - @Override - public List<HiveQueryId> getHiveQueryIdsForUserByTime(String username, long startTime, long endTime) { - JSONObject entities = delegate.hiveQueryIdsForUserByTime(username, startTime, endTime); - return parseHqidJsonFromATS(entities); - } - - @Override - public List<HiveQueryId> getHiveQueryIdByEntityList(List<String> hiveIds) { - List<HiveQueryId> hiveQueryIds = new LinkedList<>(); - for (String id : hiveIds) { - HiveQueryId hqi = this.getHiveQueryIdByHiveEntityId(id); - if (null != hqi.entity) { - hiveQueryIds.add(hqi); - } - } - return hiveQueryIds; - } - - private TezDagId parseTezDag(JSONArray tezDagEntities) { - assert tezDagEntities.size() <= 1; - if (tezDagEntities.size() == 0) { - return new TezDagId(); - } - JSONObject tezDagEntity = (JSONObject) tezDagEntities.get(0); - - TezDagId parsedDag = new TezDagId(); - JSONArray applicationIds = (JSONArray) ((JSONObject) tezDagEntity.get("primaryfilters")).get("applicationId"); - parsedDag.entity = (String) tezDagEntity.get("entity"); - parsedDag.applicationId = (String) applicationIds.get(0); - parsedDag.status = (String) ((JSONObject) tezDagEntity.get("otherinfo")).get("status"); - return parsedDag; - } - - private HiveQueryId parseAtsHiveJob(JSONObject job) { - HiveQueryId parsedJob = new HiveQueryId(); - - parsedJob.entity = (String) job.get("entity"); - parsedJob.url = delegate.hiveQueryIdDirectUrl((String) job.get("entity")); - parsedJob.starttime = ((Long) job.get("starttime")); - - JSONObject primaryfilters = (JSONObject) job.get("primaryfilters"); - JSONArray operationIds = (JSONArray) primaryfilters.get("operationid"); - if (operationIds != null) { - parsedJob.operationId = (String) (operationIds).get(0); - } - JSONArray users = (JSONArray) primaryfilters.get("user"); - if (users != null) { - parsedJob.user = (String) (users).get(0); - } - - JSONObject lastEvent = getLastEvent(job); - long lastEventTimestamp = ((Long) lastEvent.get("timestamp")); - - parsedJob.duration = (lastEventTimestamp - parsedJob.starttime) / MillisInSecond; - - JSONObject otherinfo = (JSONObject) job.get("otherinfo"); - if (otherinfo.get("QUERY") != null) { // workaround for HIVE-10829 - JSONObject query = (JSONObject) JSONValue.parse((String) otherinfo.get("QUERY")); - - parsedJob.query = (String) query.get("queryText"); - JSONObject stages = (JSONObject) ((JSONObject) query.get("queryPlan")).get("STAGE PLANS"); - - List<String> dagIds = new LinkedList<String>(); - List<JSONObject> stagesList = new LinkedList<JSONObject>(); - - for (Object key : stages.keySet()) { - JSONObject stage = (JSONObject) stages.get(key); - if (stage.get("Tez") != null) { - String dagId = (String) ((JSONObject) stage.get("Tez")).get("DagName:"); - dagIds.add(dagId); - } - stagesList.add(stage); - } - parsedJob.dagNames = dagIds; - parsedJob.stages = stagesList; - } - - if (otherinfo.get("VERSION") != null) { - parsedJob.version = (Long) otherinfo.get("VERSION"); - } - return parsedJob; - } - - private TezVertexId parseVertex(JSONObject vertex) { - TezVertexId tezVertexId = new TezVertexId(); - tezVertexId.entity = (String)vertex.get("entity"); - JSONObject otherinfo = (JSONObject)vertex.get("otherinfo"); - if (otherinfo != null) - tezVertexId.vertexName = (String)otherinfo.get("vertexName"); - return tezVertexId; - } - - private JSONObject getLastEvent(JSONObject atsEntity) { - JSONArray events = (JSONArray) atsEntity.get("events"); - return (JSONObject) events.get(0); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java deleted file mode 100644 index f035f75..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.atsJobs; - -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.utils.ambari.AmbariApi; - -import java.util.HashMap; -import java.util.Map; - -public class ATSParserFactory { - - private ViewContext context; - private final AmbariApi ambariApi; - - public ATSParserFactory(ViewContext context) { - this.context = context; - this.ambariApi = new AmbariApi(context); - } - - public ATSParser getATSParser() { - ATSRequestsDelegateImpl delegate = new ATSRequestsDelegateImpl(context, getATSUrl()); - return new ATSParser(delegate); - } - - public String getATSUrl() { - return ambariApi.getServices().getTimelineServerUrl(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java deleted file mode 100644 index 6de5773..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.atsJobs; - -import org.json.simple.JSONObject; - -public interface ATSRequestsDelegate { - String hiveQueryIdDirectUrl(String entity); - - String hiveQueryIdOperationIdUrl(String operationId); - - String tezDagDirectUrl(String entity); - - String tezDagNameUrl(String name); - - String tezVerticesListForDAGUrl(String dagId); - - JSONObject hiveQueryIdsForUser(String username); - - JSONObject hiveQueryIdByOperationId(String operationId); - - JSONObject tezDagByName(String name); - - JSONObject tezVerticesListForDAG(String dagId); - - JSONObject tezDagByEntity(String entity); - - JSONObject hiveQueryIdsForUserByTime(String username, long startTime, long endTime); - - JSONObject hiveQueryEntityByEntityId(String hiveEntityId); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java deleted file mode 100644 index f52c9da..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.atsJobs; - -import org.apache.ambari.view.ViewContext; -import org.apache.commons.io.IOUtils; -import org.json.simple.JSONObject; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; - -public class ATSRequestsDelegateImpl implements ATSRequestsDelegate { - protected final static Logger LOG = - LoggerFactory.getLogger(ATSRequestsDelegateImpl.class); - public static final String EMPTY_ENTITIES_JSON = "{ \"entities\" : [ ] }"; - - private ViewContext context; - private String atsUrl; - - public ATSRequestsDelegateImpl(ViewContext context, String atsUrl) { - this.context = context; - this.atsUrl = addProtocolIfMissing(atsUrl); - } - - private String addProtocolIfMissing(String atsUrl) { - if (!atsUrl.matches("^[^:]+://.*$")) - atsUrl = "http://" + atsUrl; - return atsUrl; - } - - @Override - public String hiveQueryIdDirectUrl(String entity) { - return atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID/" + entity; - } - - @Override - public String hiveQueryIdOperationIdUrl(String operationId) { - // ATS parses operationId started with digit as integer and not returns the response. - // Quotation prevents this. - return atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID?primaryFilter=operationid:%22" + operationId + "%22"; - } - - @Override - public String tezDagDirectUrl(String entity) { - return atsUrl + "/ws/v1/timeline/TEZ_DAG_ID/" + entity; - } - - @Override - public String tezDagNameUrl(String name) { - return atsUrl + "/ws/v1/timeline/TEZ_DAG_ID?primaryFilter=dagName:" + name; - } - - @Override - public String tezVerticesListForDAGUrl(String dagId) { - return atsUrl + "/ws/v1/timeline/TEZ_VERTEX_ID?primaryFilter=TEZ_DAG_ID:" + dagId; - } - - @Override - public JSONObject hiveQueryIdsForUser(String username) { - String hiveQueriesListUrl = atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID?primaryFilter=requestuser:" + username; - String response = readFromWithDefault(hiveQueriesListUrl, "{ \"entities\" : [ ] }"); - return (JSONObject) JSONValue.parse(response); - } - - @Override - public JSONObject hiveQueryIdByOperationId(String operationId) { - String hiveQueriesListUrl = hiveQueryIdOperationIdUrl(operationId); - String response = readFromWithDefault(hiveQueriesListUrl, EMPTY_ENTITIES_JSON); - return (JSONObject) JSONValue.parse(response); - } - - @Override - public JSONObject tezDagByName(String name) { - String tezDagUrl = tezDagNameUrl(name); - String response = readFromWithDefault(tezDagUrl, EMPTY_ENTITIES_JSON); - return (JSONObject) JSONValue.parse(response); - } - - @Override - public JSONObject tezDagByEntity(String entity) { - String tezDagEntityUrl = tezDagEntityUrl(entity); - String response = readFromWithDefault(tezDagEntityUrl, EMPTY_ENTITIES_JSON); - return (JSONObject) JSONValue.parse(response); - } - - /** - * fetches the HIVE_QUERY_ID from ATS for given user between given time period - * @param username: username for which to fetch hive query IDs - * @param startTime: time in miliseconds, inclusive - * @param endTime: time in miliseconds, exclusive - * @return - */ - @Override - public JSONObject hiveQueryIdsForUserByTime(String username, long startTime, long endTime) { - StringBuilder url = new StringBuilder(); - url.append(atsUrl).append("/ws/v1/timeline/HIVE_QUERY_ID?") - .append("windowStart=").append(startTime) - .append("&windowEnd=").append(endTime) - .append("&primaryFilter=requestuser:").append(username); - String hiveQueriesListUrl = url.toString(); - - String response = readFromWithDefault(hiveQueriesListUrl, EMPTY_ENTITIES_JSON); - return (JSONObject) JSONValue.parse(response); - } - - @Override - public JSONObject hiveQueryEntityByEntityId(String hiveEntityId) { - StringBuilder url = new StringBuilder(); - url.append(atsUrl).append("/ws/v1/timeline/HIVE_QUERY_ID/").append(hiveEntityId); - String hiveQueriesListUrl = url.toString(); - String response = readFromWithDefault(hiveQueriesListUrl, EMPTY_ENTITIES_JSON); - return (JSONObject) JSONValue.parse(response); - } - - private String tezDagEntityUrl(String entity) { - return atsUrl + "/ws/v1/timeline/TEZ_DAG_ID?primaryFilter=callerId:" + entity; - } - - public boolean checkATSStatus() throws IOException { - String url = atsUrl + "/ws/v1/timeline/"; - InputStream responseInputStream = context.getURLStreamProvider().readAsCurrent(url, "GET", - (String)null, new HashMap<String, String>()); - IOUtils.toString(responseInputStream); - return true; - } - - @Override - public JSONObject tezVerticesListForDAG(String dagId) { - String response = readFromWithDefault(tezVerticesListForDAGUrl(dagId), "{ \"entities\" : [ ] }"); - return (JSONObject) JSONValue.parse(response); - } - - - - protected String readFromWithDefault(String atsUrl, String defaultResponse) { - String response; - try { - InputStream responseInputStream = context.getURLStreamProvider().readAsCurrent(atsUrl, "GET", - (String)null, new HashMap<String, String>()); - response = IOUtils.toString(responseInputStream); - } catch (IOException e) { - LOG.error("Error while reading from ATS", e); - response = defaultResponse; - } - return response; - } - - public String getAtsUrl() { - return atsUrl; - } - - public void setAtsUrl(String atsUrl) { - this.atsUrl = atsUrl; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java deleted file mode 100644 index bb81fef..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.atsJobs; - -import org.json.simple.JSONObject; - -import java.util.List; - -public class HiveQueryId { - public static long ATS_15_RESPONSE_VERSION = 2; // version returned from ATS 1.5 release - - public String url; - - public String entity; - public String query; - - public List<String> dagNames; - - public List<JSONObject> stages; - - public long starttime; - public long duration; - public String operationId; - public String user; - public long version; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java deleted file mode 100644 index 547dfec..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.atsJobs; - -import java.util.List; - -public interface IATSParser { - List<HiveQueryId> getHiveQueryIdsForUser(String username); - - List<TezVertexId> getVerticesForDAGId(String dagId); - - HiveQueryId getHiveQueryIdByOperationId(String guidString); - - TezDagId getTezDAGByName(String name); - - TezDagId getTezDAGByEntity(String entity); - - List<HiveQueryId> getHiveQueryIdsForUserByTime(String username, long startTime, long endTime); - - HiveQueryId getHiveQueryIdByHiveEntityId(String hiveEntityId); - - List<HiveQueryId> getHiveQueryIdByEntityList(List<String> hiveEntityIds); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java deleted file mode 100644 index a814286..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.atsJobs; - -public class TezDagId { - public static final String STATUS_UNKNOWN = "UNKNOWN"; - public String applicationId = ""; - public String entity = ""; - public String status = STATUS_UNKNOWN; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezVertexId.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezVertexId.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezVertexId.java deleted file mode 100644 index 34dd996..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezVertexId.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.atsJobs; - -public class TezVertexId { - public String entity; - public String vertexName; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMParser.java deleted file mode 100644 index b39be44..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMParser.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.rm; - -import org.apache.ambari.view.hive.resources.jobs.atsJobs.TezVertexId; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - * Parser of Resource Manager responses - */ -public class RMParser { - protected final static Logger LOG = - LoggerFactory.getLogger(RMParser.class); - private RMRequestsDelegate delegate; - - public RMParser(RMRequestsDelegate delegate) { - this.delegate = delegate; - } - - /** - * Progress of DAG - * @param appId App Id - * @param dagId DAG Id - * @return progress of DAG - */ - public Double getDAGProgress(String appId, String dagId) { - String dagIdx = parseDagIdIndex(dagId); - JSONObject progresses = delegate.dagProgress(appId, dagIdx); - - double dagProgressValue; - if (progresses != null) { - JSONObject dagProgress = (JSONObject) progresses.get("dagProgress"); - dagProgressValue = (Double) (dagProgress.get("progress")); - } else { - LOG.error("Error while retrieving progress of " + appId + ":" + dagId + ". 0 assumed."); - dagProgressValue = 0; - } - return dagProgressValue; - } - - /** - * Progress of vertices - * @param appId App Id - * @param dagId DAG Id - * @param vertices vertices list - * @return list of vertices - */ - public List<VertexProgress> getDAGVerticesProgress(String appId, String dagId, List<TezVertexId> vertices) { - String dagIdx = parseDagIdIndex(dagId); - - Map<String, String> vertexIdToEntityMapping = new HashMap<String, String>(); - StringBuilder builder = new StringBuilder(); - if (vertices.size() > 0) { - for (TezVertexId vertexId : vertices) { - String[] parts = vertexId.entity.split("_"); - String vertexIdx = parts[parts.length - 1]; - builder.append(vertexIdx).append(","); - - vertexIdToEntityMapping.put(vertexId.entity, vertexId.vertexName); - } - builder.setLength(builder.length() - 1); // remove last comma - } - - String commaSeparatedVertices = builder.toString(); - - List<VertexProgress> parsedVertexProgresses = new LinkedList<VertexProgress>(); - JSONObject vertexProgressesResponse = delegate.verticesProgress( - appId, dagIdx, commaSeparatedVertices); - if (vertexProgressesResponse == null) { - LOG.error("Error while retrieving progress of vertices " + - appId + ":" + dagId + ":" + commaSeparatedVertices + ". 0 assumed for all vertices."); - for (TezVertexId vertexId : vertices) { - VertexProgress vertexProgressInfo = new VertexProgress(); - vertexProgressInfo.name = vertexId.vertexName; - vertexProgressInfo.progress = 0.0; - parsedVertexProgresses.add(vertexProgressInfo); - } - return parsedVertexProgresses; - } - JSONArray vertexProgresses = (JSONArray) vertexProgressesResponse.get("vertexProgresses"); - - for (Object vertex : vertexProgresses) { - JSONObject jsonObject = (JSONObject) vertex; - - VertexProgress vertexProgressInfo = new VertexProgress(); - vertexProgressInfo.id = (String) jsonObject.get("id"); - vertexProgressInfo.name = vertexIdToEntityMapping.get(vertexProgressInfo.id); - vertexProgressInfo.progress = (Double) jsonObject.get("progress"); - - parsedVertexProgresses.add(vertexProgressInfo); - } - return parsedVertexProgresses; - } - - public String parseDagIdIndex(String dagId) { - String[] dagIdParts = dagId.split("_"); - return dagIdParts[dagIdParts.length - 1]; - } - - public static class VertexProgress { - public String id; - public String name; - public Double progress; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMParserFactory.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMParserFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMParserFactory.java deleted file mode 100644 index 260b464..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMParserFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.rm; - -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.utils.ServiceFormattedException; -import org.apache.ambari.view.utils.ambari.AmbariApi; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RMParserFactory { - protected final static Logger LOG = - LoggerFactory.getLogger(RMParserFactory.class); - - private final ViewContext context; - private final AmbariApi ambariApi; - - public RMParserFactory(ViewContext context) { - this.context = context; - this.ambariApi = new AmbariApi(context); - } - - public RMParser getRMParser() { - String rmUrl = getRMUrl(); - - RMRequestsDelegate delegate = new RMRequestsDelegateImpl(context, rmUrl); - return new RMParser(delegate); - } - - public String getRMUrl() { - return ambariApi.getServices().getRMUrl(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMRequestsDelegate.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMRequestsDelegate.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMRequestsDelegate.java deleted file mode 100644 index 6114b34..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMRequestsDelegate.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.rm; - -import org.json.simple.JSONObject; - -public interface RMRequestsDelegate { - String dagProgressUrl(String appId, String dagIdx); - - String verticesProgressUrl(String appId, String dagIdx, String vertices); - - JSONObject dagProgress(String appId, String dagIdx); - - JSONObject verticesProgress(String appId, String dagIdx, String commaSeparatedVertices); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMRequestsDelegateImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMRequestsDelegateImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMRequestsDelegateImpl.java deleted file mode 100644 index 087ef68..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/rm/RMRequestsDelegateImpl.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.rm; - -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.utils.ServiceFormattedException; -import org.apache.ambari.view.utils.ambari.AmbariApi; -import org.apache.commons.io.IOUtils; -import org.json.simple.JSONObject; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; - -public class RMRequestsDelegateImpl implements RMRequestsDelegate { - protected final static Logger LOG = - LoggerFactory.getLogger(RMRequestsDelegateImpl.class); - public static final String EMPTY_ENTITIES_JSON = "{ \"entities\" : [ ] }"; - - private ViewContext context; - private String rmUrl; - - public RMRequestsDelegateImpl(ViewContext context, String rmUrl) { - this.context = context; - this.rmUrl = rmUrl; - } - - @Override - public String dagProgressUrl(String appId, String dagIdx) { - return rmUrl + String.format("/proxy/%s/ws/v1/tez/dagProgress?dagID=%s", appId, dagIdx); - } - - @Override - public String verticesProgressUrl(String appId, String dagIdx, String vertices) { - return rmUrl + String.format("/proxy/%s/ws/v1/tez/vertexProgresses?dagID=%s&vertexID=%s", appId, dagIdx, vertices); - } - - @Override - public JSONObject dagProgress(String appId, String dagIdx) { - String url = dagProgressUrl(appId, dagIdx); - String response; - try { - InputStream responseInputStream = context.getURLStreamProvider().readFrom(url, "GET", - (String)null, new HashMap<String, String>()); - response = IOUtils.toString(responseInputStream); - } catch (IOException e) { - throw new ServiceFormattedException( - String.format("R010 DAG %s in app %s not found or ResourceManager is unreachable", dagIdx, appId)); - } - return (JSONObject) JSONValue.parse(response); - } - - @Override - public JSONObject verticesProgress(String appId, String dagIdx, String commaSeparatedVertices) { - String url = verticesProgressUrl(appId, dagIdx, commaSeparatedVertices); - String response; - try { - InputStream responseInputStream = context.getURLStreamProvider().readFrom(url, "GET", - (String)null, new HashMap<String, String>()); - response = IOUtils.toString(responseInputStream); - } catch (IOException e) { - throw new ServiceFormattedException( - String.format("R020 DAG %s in app %s not found or ResourceManager is unreachable", dagIdx, appId)); - } - return (JSONObject) JSONValue.parse(response); - } - - protected String readFromWithDefault(String url, String defaultResponse) { - String response; - try { - InputStream responseInputStream = context.getURLStreamProvider().readFrom(url, "GET", - (String)null, new HashMap<String, String>()); - response = IOUtils.toString(responseInputStream); - } catch (IOException e) { - LOG.error("Error while reading from RM", e); - response = defaultResponse; - } - return response; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java deleted file mode 100644 index 89fbb85..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.viewJobs; - -public interface IJobControllerFactory { - JobController createControllerForJob(Job job); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java deleted file mode 100644 index 98d6589..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.viewJobs; - - -import org.apache.ambari.view.hive.persistence.utils.Indexed; -import org.apache.ambari.view.hive.persistence.utils.PersonalResource; - -import java.beans.Transient; -import java.io.Serializable; - -/** - * Interface for Job bean to create Proxy for it - */ -public interface Job extends Serializable,Indexed,PersonalResource { - String JOB_STATE_UNKNOWN = "Unknown"; - String JOB_STATE_INITIALIZED = "Initialized"; - String JOB_STATE_RUNNING = "Running"; - String JOB_STATE_FINISHED = "Succeeded"; - String JOB_STATE_CANCELED = "Canceled"; - String JOB_STATE_CLOSED = "Closed"; - String JOB_STATE_ERROR = "Error"; - String JOB_STATE_PENDING = "Pending"; - - @Transient - String getHiveQueryId(); - - @Transient - void setHiveQueryId(String hiveQueryId); - - String getId(); - - void setId(String id); - - String getOwner(); - - void setOwner(String owner); - - String getTitle(); - - void setTitle(String title); - - String getQueryFile(); - - void setQueryFile(String queryFile); - - Long getDateSubmitted(); - - void setDateSubmitted(Long dateSubmitted); - - Long getDuration(); - - void setDuration(Long duration); - - String getStatus(); - - void setStatus(String status); - - String getForcedContent(); - - void setForcedContent(String forcedContent); - - String getQueryId(); - - void setQueryId(String queryId); - - String getStatusDir(); - - void setStatusDir(String statusDir); - - String getDataBase(); - - void setDataBase(String dataBase); - - String getLogFile(); - - void setLogFile(String logFile); - - String getConfFile(); - - void setConfFile(String confFile); - - String getApplicationId(); - - void setApplicationId(String applicationId); - - String getDagName(); - - void setDagName(String dagName); - - String getDagId(); - - void setDagId(String dagId); - - String getSessionTag(); - - void setSessionTag(String sessionTag); - - String getSqlState(); - - void setSqlState(String sqlState); - - String getStatusMessage(); - - void setStatusMessage(String message); - - String getReferrer(); - - void setReferrer(String referrer); - - String getGlobalSettings(); - - void setGlobalSettings(String globalSettings); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java deleted file mode 100644 index 3922839..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.viewJobs; - -import org.apache.ambari.view.hive.client.Cursor; -import org.apache.ambari.view.hive.client.HiveClientException; -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.jobs.NoOperationStatusSetException; -import org.apache.ambari.view.hive.resources.jobs.OperationHandleController; - -public interface JobController { - - OperationHandleController.OperationStatus getStatus() throws ItemNotFound, HiveClientException, NoOperationStatusSetException; - - void submit(); - - void cancel() throws ItemNotFound; - - Job getJob(); - - /** - * Use carefully. Returns unproxied bean object - * @return unproxied bean object - */ - Job getJobPOJO(); - - Cursor getResults() throws ItemNotFound; - boolean hasResults() throws ItemNotFound; - - void afterCreation(); - - void update(); - - boolean isModified(); - - void clearModified(); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java deleted file mode 100644 index a2790d1..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.viewJobs; - -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.utils.SharedObjectsFactory; - -public class JobControllerFactory implements IJobControllerFactory { - private SharedObjectsFactory sharedObjectsFactory; - private ViewContext context; - - public JobControllerFactory(ViewContext context, SharedObjectsFactory sharedObjectsFactory) { - this.sharedObjectsFactory = sharedObjectsFactory; - this.context = context; - } - - @Override - public JobController createControllerForJob(Job job) { - return new JobControllerImpl(context, job, - sharedObjectsFactory.getOperationHandleControllerFactory(), - sharedObjectsFactory.getSavedQueryResourceManager(), - sharedObjectsFactory.getATSParser(), - sharedObjectsFactory.getHdfsApi()); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e423a65e/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java deleted file mode 100644 index a408619..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java +++ /dev/null @@ -1,401 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.jobs.viewJobs; - -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive.client.Cursor; -import org.apache.ambari.view.hive.client.HiveClientException; -import org.apache.ambari.view.hive.client.HiveClientRuntimeException; -import org.apache.ambari.view.hive.client.HiveErrorStatusException; -import org.apache.ambari.view.hive.client.UserLocalConnection; -import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.jobs.ConnectionController; -import org.apache.ambari.view.hive.resources.jobs.LogParser; -import org.apache.ambari.view.hive.resources.jobs.ModifyNotificationDelegate; -import org.apache.ambari.view.hive.resources.jobs.ModifyNotificationInvocationHandler; -import org.apache.ambari.view.hive.resources.jobs.NoOperationStatusSetException; -import org.apache.ambari.view.hive.resources.jobs.OperationHandleController; -import org.apache.ambari.view.hive.resources.jobs.OperationHandleControllerFactory; -import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser; -import org.apache.ambari.view.hive.resources.savedQueries.SavedQuery; -import org.apache.ambari.view.hive.resources.savedQueries.SavedQueryResourceManager; -import org.apache.ambari.view.hive.utils.BadRequestFormattedException; -import org.apache.ambari.view.hive.utils.FilePaginator; -import org.apache.ambari.view.hive.utils.HiveClientFormattedException; -import org.apache.ambari.view.hive.utils.MisconfigurationFormattedException; -import org.apache.ambari.view.hive.utils.ServiceFormattedException; -import org.apache.ambari.view.utils.hdfs.HdfsApi; -import org.apache.ambari.view.utils.hdfs.HdfsApiException; -import org.apache.ambari.view.utils.hdfs.HdfsUtil; -import org.apache.hive.service.cli.thrift.TSessionHandle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.Proxy; -import java.text.SimpleDateFormat; -import java.util.Date; - -public class JobControllerImpl implements JobController, ModifyNotificationDelegate { - private final static Logger LOG = - LoggerFactory.getLogger(JobControllerImpl.class); - - private ViewContext context; - private HdfsApi hdfsApi; - private Job jobUnproxied; - private Job job; - private boolean modified; - - private OperationHandleControllerFactory opHandleControllerFactory; - private ConnectionController hiveConnection; - private SavedQueryResourceManager savedQueryResourceManager; - private IATSParser atsParser; - - /** - * JobController constructor - * Warning: Create JobControllers ONLY using JobControllerFactory! - */ - public JobControllerImpl(ViewContext context, Job job, - OperationHandleControllerFactory opHandleControllerFactory, - SavedQueryResourceManager savedQueryResourceManager, - IATSParser atsParser, - HdfsApi hdfsApi) { - this.context = context; - setJobPOJO(job); - this.opHandleControllerFactory = opHandleControllerFactory; - this.savedQueryResourceManager = savedQueryResourceManager; - this.atsParser = atsParser; - this.hdfsApi = hdfsApi; - - UserLocalConnection connectionLocal = new UserLocalConnection(); - this.hiveConnection = new ConnectionController(opHandleControllerFactory, connectionLocal.get(context)); - } - - public String getQueryForJob() { - FilePaginator paginator = new FilePaginator(job.getQueryFile(), hdfsApi); - String query; - try { - query = paginator.readPage(0); //warning - reading only 0 page restricts size of query to 1MB - } catch (IOException e) { - throw new ServiceFormattedException("F030 Error when reading file " + job.getQueryFile(), e); - } catch (InterruptedException e) { - throw new ServiceFormattedException("F030 Error when reading file " + job.getQueryFile(), e); - } - return query; - } - - private static final String DEFAULT_DB = "default"; - public String getJobDatabase() { - if (job.getDataBase() != null) { - return job.getDataBase(); - } else { - return DEFAULT_DB; - } - } - - @Override - public OperationHandleController.OperationStatus getStatus() throws ItemNotFound, HiveClientException, NoOperationStatusSetException { - OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job); - return handle.getOperationStatus(); - } - - @Override - public void submit() { - setupHiveBeforeQueryExecute(); - - String query = getQueryForJob(); - OperationHandleController handleController = hiveConnection.executeQuery(getSession(), query); - - handleController.persistHandleForJob(job); - } - - private void setupHiveBeforeQueryExecute() { - String database = getJobDatabase(); - hiveConnection.selectDatabase(getSession(), database); - } - - private TSessionHandle getSession() { - try { - if (job.getSessionTag() != null) { - return hiveConnection.getSessionByTag(getJob().getSessionTag()); - } - } catch (HiveClientException ignore) { - LOG.debug("Stale sessionTag was provided, new session will be opened"); - } - - String tag = hiveConnection.openSession(); - job.setSessionTag(tag); - try { - return hiveConnection.getSessionByTag(tag); - } catch (HiveClientException e) { - throw new HiveClientFormattedException(e); - } - } - - @Override - public void cancel() throws ItemNotFound { - OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job); - handle.cancel(); - } - - @Override - public void update() { - updateOperationStatus(); - updateOperationLogs(); - - updateJobDuration(); - } - - public void updateOperationStatus() { - try { - - OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job); - OperationHandleController.OperationStatus status = handle.getOperationStatus(); - job.setStatus(status.status); - job.setStatusMessage(status.message); - job.setSqlState(status.sqlState); - LOG.debug("Status of job#" + job.getId() + " is " + job.getStatus()); - - } catch (NoOperationStatusSetException e) { - LOG.info("Operation state is not set for job#" + job.getId()); - - } catch (HiveErrorStatusException e) { - LOG.debug("Error updating status for job#" + job.getId() + ": " + e.getMessage()); - job.setStatus(Job.JOB_STATE_UNKNOWN); - - } catch (HiveClientException e) { - throw new HiveClientFormattedException(e); - - } catch (ItemNotFound itemNotFound) { - LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't update status"); - } - } - - public void updateOperationLogs() { - try { - OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job); - String logs = handle.getLogs(); - - LogParser info = LogParser.parseLog(logs); - LogParser.AppId app = info.getLastAppInList(); - if (app != null) { - job.setApplicationId(app.getIdentifier()); - } - - String logFilePath = job.getLogFile(); - HdfsUtil.putStringToFile(hdfsApi, logFilePath, logs); - - } catch (HiveClientRuntimeException ex) { - LOG.error("Error while fetching logs: " + ex.getMessage()); - } catch (ItemNotFound itemNotFound) { - LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't read logs"); - } catch (HdfsApiException e) { - throw new ServiceFormattedException(e); - } - } - - public boolean isJobEnded() { - String status = job.getStatus(); - return status.equals(Job.JOB_STATE_FINISHED) || status.equals(Job.JOB_STATE_CANCELED) || - status.equals(Job.JOB_STATE_CLOSED) || status.equals(Job.JOB_STATE_ERROR) || - status.equals(Job.JOB_STATE_UNKNOWN); // Unknown is not finished, but polling makes no sense - } - - @Override - public Job getJob() { - return job; - } - - /** - * Use carefully. Returns unproxied bean object - * @return unproxied bean object - */ - @Override - public Job getJobPOJO() { - return jobUnproxied; - } - - public void setJobPOJO(Job jobPOJO) { - Job jobModifyNotificationProxy = (Job) Proxy.newProxyInstance(jobPOJO.getClass().getClassLoader(), - new Class[]{Job.class}, - new ModifyNotificationInvocationHandler(jobPOJO, this)); - this.job = jobModifyNotificationProxy; - - this.jobUnproxied = jobPOJO; - } - - @Override - public Cursor getResults() throws ItemNotFound { - OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job); - return handle.getResults(); - } - - @Override - public boolean hasResults() throws ItemNotFound { - OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job); - return handle.hasResults(); - } - - @Override - public void afterCreation() { - setupStatusDirIfNotPresent(); - setupQueryFileIfNotPresent(); - setupLogFileIfNotPresent(); - - setCreationDate(); - } - - public void setupLogFileIfNotPresent() { - if (job.getLogFile() == null || job.getLogFile().isEmpty()) { - setupLogFile(); - } - } - - public void setupQueryFileIfNotPresent() { - if (job.getQueryFile() == null || job.getQueryFile().isEmpty()) { - setupQueryFile(); - } - } - - public void setupStatusDirIfNotPresent() { - if (job.getStatusDir() == null || job.getStatusDir().isEmpty()) { - setupStatusDir(); - } - } - - private static final long MillisInSecond = 1000L; - - public void updateJobDuration() { - job.setDuration((System.currentTimeMillis() / MillisInSecond) - (job.getDateSubmitted() / MillisInSecond)); - } - - public void setCreationDate() { - job.setDateSubmitted(System.currentTimeMillis()); - } - - private void setupLogFile() { - LOG.debug("Creating log file for job#" + job.getId()); - - String logFile = job.getStatusDir() + "/" + "logs"; - try { - HdfsUtil.putStringToFile(hdfsApi, logFile, ""); - } catch (HdfsApiException e) { - throw new ServiceFormattedException(e); - } - - job.setLogFile(logFile); - LOG.debug("Log file for job#" + job.getId() + ": " + logFile); - } - - private void setupStatusDir() { - String newDirPrefix = makeStatusDirectoryPrefix(); - String newDir = null; - try { - newDir = HdfsUtil.findUnallocatedFileName(hdfsApi, newDirPrefix, ""); - } catch (HdfsApiException e) { - throw new ServiceFormattedException(e); - } - - job.setStatusDir(newDir); - LOG.debug("Status dir for job#" + job.getId() + ": " + newDir); - } - - private String makeStatusDirectoryPrefix() { - String userScriptsPath = context.getProperties().get("jobs.dir"); - - if (userScriptsPath == null) { // TODO: move check to initialization code - String msg = "jobs.dir is not configured!"; - LOG.error(msg); - throw new MisconfigurationFormattedException("jobs.dir"); - } - - String normalizedName = String.format("hive-job-%s", job.getId()); - String timestamp = new SimpleDateFormat("yyyy-MM-dd_hh-mm").format(new Date()); - return String.format(userScriptsPath + - "/%s-%s", normalizedName, timestamp); - } - - private void setupQueryFile() { - String statusDir = job.getStatusDir(); - assert statusDir != null : "setupStatusDir() should be called first"; - - String jobQueryFilePath = statusDir + "/" + "query.hql"; - - try { - - if (job.getForcedContent() != null) { - - HdfsUtil.putStringToFile(hdfsApi, jobQueryFilePath, job.getForcedContent()); - job.setForcedContent(""); // prevent forcedContent to be written to DB - - } - else if (job.getQueryId() != null) { - - String savedQueryFile = getRelatedSavedQueryFile(); - hdfsApi.copy(savedQueryFile, jobQueryFilePath); - job.setQueryFile(jobQueryFilePath); - - } else { - - throw new BadRequestFormattedException("queryId or forcedContent should be passed!", null); - - } - - } catch (IOException e) { - throw new ServiceFormattedException("F040 Error when creating file " + jobQueryFilePath, e); - } catch (InterruptedException e) { - throw new ServiceFormattedException("F040 Error when creating file " + jobQueryFilePath, e); - } catch (HdfsApiException e) { - throw new ServiceFormattedException(e); - } - job.setQueryFile(jobQueryFilePath); - - LOG.debug("Query file for job#" + job.getId() + ": " + jobQueryFilePath); - } - - private String getRelatedSavedQueryFile() { - SavedQuery savedQuery; - try { - savedQuery = savedQueryResourceManager.read(job.getQueryId()); - } catch (ItemNotFound itemNotFound) { - throw new BadRequestFormattedException("queryId not found!", itemNotFound); - } - return savedQuery.getQueryFile(); - } - - @Override - public boolean onModification(Object object) { - setModified(true); - return true; - } - - @Override - public boolean isModified() { - return modified; - } - - public void setModified(boolean modified) { - this.modified = modified; - } - - @Override - public void clearModified() { - setModified(false); - } -}