[ https://issues.apache.org/jira/browse/SPARK-26751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-26751: ------------------------------------ Assignee: Apache Spark > HiveSessionImpl might have memory leak since Operation do not close properly > ---------------------------------------------------------------------------- > > Key: SPARK-26751 > URL: https://issues.apache.org/jira/browse/SPARK-26751 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0 > Reporter: zhoukang > Assignee: Apache Spark > Priority: Major > Attachments: 26751.png > > > When we run in background and we get exception which is not HiveSQLException, > we may encounter memory leak since handleToOperation will not removed > correctly. > The reason is below: > 1. when calling operation.run we throw an exception which is not > HiveSQLException > 2. then opHandleSet will not add the opHandle, and > operationManager.closeOperation(opHandle); will not be called > {code:java} > private OperationHandle executeStatementInternal(String statement, > Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException { > this.acquire(true); > OperationManager operationManager = this.getOperationManager(); > ExecuteStatementOperation operation = > operationManager.newExecuteStatementOperation(this.getSession(), statement, > confOverlay, runAsync); > OperationHandle opHandle = operation.getHandle(); > OperationHandle e; > try { > operation.run(); > this.opHandleSet.add(opHandle); > e = opHandle; > } catch (HiveSQLException var11) { > operationManager.closeOperation(opHandle); > throw var11; > } finally { > this.release(true); > } > return e; > } > try { > // This submit blocks if no background threads are available to run > this operation > val backgroundHandle = > > parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) > setBackgroundHandle(backgroundHandle) > } catch { > case rejected: RejectedExecutionException => > setState(OperationState.ERROR) > throw new HiveSQLException("The background threadpool cannot > accept" + > " new task for execution, please retry the operation", rejected) > case NonFatal(e) => > logError(s"Error executing query in background", e) > setState(OperationState.ERROR) > throw e > } > } > {code} > 3. when we close the session we will also call > operationManager.closeOperation(opHandle),since we did not add this opHandle > into the opHandleSet. > {code} > public void close() throws HiveSQLException { > try { > this.acquire(true); > Iterator ioe = this.opHandleSet.iterator(); > while(ioe.hasNext()) { > OperationHandle opHandle = (OperationHandle)ioe.next(); > this.operationManager.closeOperation(opHandle); > } > this.opHandleSet.clear(); > this.cleanupSessionLogDir(); > this.cleanupPipeoutFile(); > HiveHistory ioe1 = this.sessionState.getHiveHistory(); > if(null != ioe1) { > ioe1.closeStream(); > } > try { > this.sessionState.close(); > } finally { > this.sessionState = null; > } > } catch (IOException var17) { > throw new HiveSQLException("Failure to close", var17); > } finally { > if(this.sessionState != null) { > try { > this.sessionState.close(); > } catch (Throwable var15) { > LOG.warn("Error closing session", var15); > } > this.sessionState = null; > } > this.release(true); > } > } > {code} > 4. however, the opHandle will added into handleToOperation for each statement > {code} > val handleToOperation = ReflectionUtils > .getSuperField[JMap[OperationHandle, Operation]](this, > "handleToOperation") > val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() > val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() > override def newExecuteStatementOperation( > parentSession: HiveSession, > statement: String, > confOverlay: JMap[String, String], > async: Boolean): ExecuteStatementOperation = synchronized { > val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) > require(sqlContext != null, s"Session handle: > ${parentSession.getSessionHandle} has not been" + > s" initialized or had already closed.") > val conf = sqlContext.sessionState.conf > val hiveSessionState = parentSession.getSessionState > setConfMap(conf, hiveSessionState.getOverriddenConfigurations) > setConfMap(conf, hiveSessionState.getHiveVariables) > val runInBackground = async && > conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) > val operation = new SparkExecuteStatementOperation(parentSession, > statement, confOverlay, > runInBackground)(sqlContext, sessionToActivePool) > handleToOperation.put(operation.getHandle, operation) > logDebug(s"Created Operation for $statement with session=$parentSession, > " + > s"runInBackground=$runInBackground") > operation > } > {code} > Below is an example which has memory leak: > !26751.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org