This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c1801fd [SPARK-31235][FOLLOWUP][TESTS][TEST-HADOOP3.2] Fix test "specify a more specific type for the ap… c1801fd is described below commit c1801fd6da4a2dd5f37dc366b92bede669e8fda0 Author: wang-zhun <wangzhun6...@gmail.com> AuthorDate: Fri May 8 15:41:23 2020 -0500 [SPARK-31235][FOLLOWUP][TESTS][TEST-HADOOP3.2] Fix test "specify a more specific type for the ap… ### What changes were proposed in this pull request? Update the input parameters for instantiating `RMAppManager` and `ClientRMService` ### Why are the changes needed? For hadoop3.2, if `RMAppManager` is not created correctly, the following exception will occur: ``` java.lang.RuntimeException: java.lang.NullPointerException at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135) at org.apache.hadoop.yarn.security.YarnAuthorizationProvider.getInstance(YarnAuthorizationProvider.java:55) at org.apache.hadoop.yarn.server.resourcemanager.RMAppManager.<init>(RMAppManager.java:117) ``` ### How was this patch tested? UTs Closes #28456 from wang-zhun/Fix-SPARK-31235. Authored-by: wang-zhun <wangzhun6...@gmail.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 86 +++++++++++----------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index b335e7f..7611ccd 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.{ClientRMService, RMAppMana import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler import org.apache.hadoop.yarn.server.security.ApplicationACLsManager import org.apache.hadoop.yarn.util.Records import org.mockito.ArgumentMatchers.{any, anyBoolean, anyShort, eq => meq} @@ -222,11 +223,50 @@ class ClientSuite extends SparkFunSuite with Matchers { 3 -> ("SPARK-SQL", "SPARK-SQL"), 4 -> ("012345678901234567890123", "01234567890123456789")) + // Mock yarn submit application + val yarnClient = mock(classOf[YarnClient]) + val rmApps = new ConcurrentHashMap[ApplicationId, RMApp]() + val rmContext = mock(classOf[RMContext]) + when(rmContext.getRMApps).thenReturn(rmApps) + val dispatcher = mock(classOf[Dispatcher]) + when(rmContext.getDispatcher).thenReturn(dispatcher) + when[EventHandler[_]](dispatcher.getEventHandler).thenReturn( + new EventHandler[Event[_]] { + override def handle(event: Event[_]): Unit = {} + } + ) + val writer = mock(classOf[RMApplicationHistoryWriter]) + when(rmContext.getRMApplicationHistoryWriter).thenReturn(writer) + val publisher = mock(classOf[SystemMetricsPublisher]) + when(rmContext.getSystemMetricsPublisher).thenReturn(publisher) + val yarnScheduler = mock(classOf[YarnScheduler]) + val rmAppManager = new RMAppManager(rmContext, + yarnScheduler, + null, + mock(classOf[ApplicationACLsManager]), + new Configuration()) + val clientRMService = new ClientRMService(rmContext, + yarnScheduler, + rmAppManager, + null, + null, + null) + clientRMService.init(new Configuration()) + when(yarnClient.submitApplication(any())).thenAnswer((invocationOnMock: InvocationOnMock) => { + val subContext = invocationOnMock.getArguments()(0) + .asInstanceOf[ApplicationSubmissionContext] + val request = Records.newRecord(classOf[SubmitApplicationRequest]) + request.setApplicationSubmissionContext(subContext) + clientRMService.submitApplication(request) + null + }) + + // Spark submit application + val appContext = spy(Records.newRecord(classOf[ApplicationSubmissionContext])) + when(appContext.getUnmanagedAM).thenReturn(true) for ((id, (sourceType, targetType)) <- appTypes) { val sparkConf = new SparkConf().set("spark.yarn.applicationType", sourceType) val args = new ClientArguments(Array()) - - val appContext = spy(Records.newRecord(classOf[ApplicationSubmissionContext])) val appId = ApplicationId.newInstance(123456, id) appContext.setApplicationId(appId) val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) @@ -237,48 +277,8 @@ class ClientSuite extends SparkFunSuite with Matchers { new YarnClientApplication(getNewApplicationResponse, appContext), containerLaunchContext) - val yarnClient = mock(classOf[YarnClient]) - when(yarnClient.submitApplication(any())).thenAnswer((invocationOnMock: InvocationOnMock) => { - val subContext = invocationOnMock.getArguments()(0) - .asInstanceOf[ApplicationSubmissionContext] - val request = Records.newRecord(classOf[SubmitApplicationRequest]) - request.setApplicationSubmissionContext(subContext) - - val rmContext = mock(classOf[RMContext]) - val conf = mock(classOf[Configuration]) - val map = new ConcurrentHashMap[ApplicationId, RMApp]() - when(rmContext.getRMApps).thenReturn(map) - val dispatcher = mock(classOf[Dispatcher]) - when(rmContext.getDispatcher).thenReturn(dispatcher) - when[EventHandler[_]](dispatcher.getEventHandler).thenReturn( - new EventHandler[Event[_]] { - override def handle(event: Event[_]): Unit = {} - } - ) - val writer = mock(classOf[RMApplicationHistoryWriter]) - when(rmContext.getRMApplicationHistoryWriter).thenReturn(writer) - val publisher = mock(classOf[SystemMetricsPublisher]) - when(rmContext.getSystemMetricsPublisher).thenReturn(publisher) - when(appContext.getUnmanagedAM).thenReturn(true) - - val rmAppManager = new RMAppManager(rmContext, - null, - null, - mock(classOf[ApplicationACLsManager]), - conf) - val clientRMService = new ClientRMService(rmContext, - null, - rmAppManager, - null, - null, - null) - clientRMService.submitApplication(request) - - assert(map.get(subContext.getApplicationId).getApplicationType === targetType) - null - }) - yarnClient.submitApplication(context) + assert(rmApps.get(appId).getApplicationType === targetType) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org