This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new a13c04ffc [CELEBORN-1258] Support to register application info with 
user identifier and extra info
a13c04ffc is described below

commit a13c04ffc471057ad5f4fbb164f160e99cebd0ba
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Sep 1 11:15:40 2025 +0800

    [CELEBORN-1258] Support to register application info with user identifier 
and extra info
    
    Support to register application info with user identifier and extra info.
    
    To provide more insight for the application information.
    
    A new RESTful API.
    
    UT.
    
    Closes #3428 from turboFei/app_info_uid.
    
    Lead-authored-by: Wang, Fei <[email protected]>
    Co-authored-by: Fei Wang <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit d038dd2b32575e2cc12f6f3a1b7a7b8a15ceb544)
    Signed-off-by: Shuang <[email protected]>
---
 .../apache/celeborn/client/LifecycleManager.scala  |  11 +-
 common/src/main/proto/TransportMessages.proto      |  17 ++
 .../org/apache/celeborn/common/CelebornConf.scala  |  34 +++-
 .../common/client/ApplicationInfoProvider.scala    |  31 +++
 .../client/DefaultApplicationInfoProvider.scala    |  26 +++
 .../celeborn/common/meta/ApplicationInfo.scala     |  31 +++
 .../common/protocol/message/ControlMessages.scala  |  29 ++-
 .../apache/celeborn/common/util/PbSerDeUtils.scala |  28 ++-
 .../org/apache/celeborn/common/util/Utils.scala    |  10 +-
 .../celeborn/common/util/PbSerDeUtilsTest.scala    |  13 ++
 docs/configuration/client.md                       |   2 +
 .../master/clustermeta/AbstractMetaManager.java    |  17 ++
 .../master/clustermeta/IMetadataHandler.java       |   3 +
 .../clustermeta/SingleMasterMetaManager.java       |   9 +
 .../master/clustermeta/ha/HAMasterMetaManager.java |  28 +++
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  12 ++
 master/src/main/proto/Resource.proto               |   9 +
 .../celeborn/service/deploy/master/Master.scala    |  18 ++
 .../master/http/api/v1/ApplicationResource.scala   |  24 ++-
 .../clustermeta/DefaultMetaSystemSuiteJ.java       |  13 ++
 .../ha/RatisMasterStatusSystemSuiteJ.java          |  15 ++
 .../celeborn/rest/v1/master/ApplicationApi.java    |  68 +++++++
 .../celeborn/rest/v1/model/ApplicationInfo.java    | 211 +++++++++++++++++++++
 .../rest/v1/model/ApplicationInfoResponse.java     | 120 ++++++++++++
 .../src/main/openapi3/master_rest_v1.yaml          |  48 +++++
 .../tests/client/LifecycleManagerSuite.scala       |  27 +++
 26 files changed, 841 insertions(+), 13 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index b049e0932..c1db77d59 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -42,7 +42,7 @@ import 
org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, Shu
 import org.apache.celeborn.client.listener.WorkerStatusListener
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.CelebornConf.ACTIVE_STORAGE_TYPES
-import org.apache.celeborn.common.client.MasterClient
+import org.apache.celeborn.common.client.{ApplicationInfoProvider, 
MasterClient}
 import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{ApplicationMeta, 
ShufflePartitionLocationInfo, WorkerInfo}
@@ -248,12 +248,21 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
 
   private val messagesHelper: TransportMessagesHelper = new 
TransportMessagesHelper()
 
+  private def registerApplicationInfo(): Unit = {
+    Utils.tryLogNonFatalError(
+      masterClient.send(RegisterApplicationInfo(
+        appUniqueId,
+        userIdentifier,
+        ApplicationInfoProvider.instantiate(conf).provide().asJava)))
+  }
+
   // Since method `onStart` is executed when `rpcEnv.setupEndpoint` is 
executed, and
   // `masterClient` is initialized after `rpcEnv` is initialized, if method 
`onStart` contains
   // a reference to `masterClient`, there may be cases where `masterClient` is 
null when
   // `masterClient` is called. Therefore, it's necessary to uniformly execute 
the initialization
   // method at the end of the construction of the class to perform the 
initialization operations.
   private def initialize(): Unit = {
+    registerApplicationInfo()
     // noinspection ConvertExpressionToSAM
     commitManager.start()
     heartbeater.start()
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index d53d56842..6030bea19 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -114,6 +114,7 @@ enum MessageType {
   PUSH_MERGED_DATA_SPLIT_PARTITION_INFO = 91;
   GET_STAGE_END = 92;
   GET_STAGE_END_RESPONSE = 93;
+  REGISTER_APPLICATION_INFO = 96;
 }
 
 enum StreamType {
@@ -704,6 +705,7 @@ message PbSnapshotMetaInfo {
   map<string, int64> shuffleFallbackCounts = 19;
   int64 applicationTotalCount = 20;
   map<string, int64> applicationFallbackCounts = 21;
+  map<string, PbApplicationInfo> applicationInfos = 22;
 }
 
 message PbOpenStream {
@@ -855,6 +857,13 @@ message PbApplicationMeta {
   string secret = 2;
 }
 
+message PbApplicationInfo {
+  string appId = 1;
+  PbUserIdentifier userIdentifier = 2;
+  map<string, string> extraInfo = 3;
+  int64 registrationTime = 4;
+}
+
 message PbApplicationMetaRequest {
   string appId = 1;
 }
@@ -919,3 +928,11 @@ message PbGetStageEndResponse {
 message PbChunkOffsets {
   repeated int64 chunkOffset = 1;
 }
+
+
+message PbRegisterApplicationInfo {
+  string appId = 1;
+  PbUserIdentifier userIdentifier = 2;
+  map<string, string> extraInfo = 3;
+  string requestId = 4;
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ef0bc321e..2e517ed5c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -30,6 +30,7 @@ import scala.util.matching.Regex
 import io.netty.channel.epoll.Epoll
 
 import 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
+import org.apache.celeborn.common.client.{ApplicationInfoProvider, 
DefaultApplicationInfoProvider}
 import org.apache.celeborn.common.identity.{DefaultIdentityProvider, 
HadoopBasedIdentityProvider, IdentityProvider}
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.internal.config._
@@ -893,7 +894,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def metricsCollectCriticalEnabled: Boolean = 
get(METRICS_COLLECT_CRITICAL_ENABLED)
   def metricsCapacity: Int = get(METRICS_CAPACITY)
   def metricsExtraLabels: Map[String, String] =
-    get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap
+    get(METRICS_EXTRA_LABELS).map(Utils.parseKeyValuePair).toMap
   def metricsWorkerAppTopResourceConsumptionCount: Int =
     get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT)
   def metricsWorkerAppTopResourceConsumptionBytesWrittenThreshold: Long =
@@ -946,6 +947,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
   def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
   def clientApplicationUUIDSuffixEnabled: Boolean = 
get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED)
+  def clientApplicationInfoProvider: String = 
get(CLIENT_APPLICATION_INFO_PROVIDER)
+  def clientApplicationInfoUserSpecific: Map[String, String] =
+    get(USER_SPECIFIC_APPLICATION_INFO).map(Utils.parseKeyValuePair).toMap
 
   def appUniqueIdWithUUIDSuffix(appId: String): String = {
     if (clientApplicationUUIDSuffixEnabled) {
@@ -5571,6 +5575,30 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val CLIENT_APPLICATION_INFO_PROVIDER: ConfigEntry[String] =
+    buildConf("celeborn.client.application.info.provider")
+      .categories("client")
+      .doc(s"ApplicationInfoProvider class name. Default class is " +
+        s"`${classOf[DefaultApplicationInfoProvider].getName}`. " +
+        s"Optional values: " +
+        s"${classOf[DefaultIdentityProvider].getName} user name and tenant id 
are default values or user-specific values.")
+      .version("0.6.1")
+      .stringConf
+      .createWithDefault(classOf[DefaultApplicationInfoProvider].getName)
+
+  val USER_SPECIFIC_APPLICATION_INFO: ConfigEntry[Seq[String]] =
+    buildConf("celeborn.client.application.info.user-specific")
+      .categories("client")
+      .version("0.6.1")
+      .doc("User specific information for application registration, pattern 
is" +
+        " `<key1>=<value1>[,<key2>=<value2>]*`, e.g. `cluster=celeborn`.")
+      .stringConf
+      .toSequence
+      .checkValue(
+        pairs => pairs.map(_ => 
Try(Utils.parseKeyValuePair(_))).forall(_.isSuccess),
+        "Allowed pattern is: `<key1>=<value1>[,<key2>=<value2>]*`")
+      .createWithDefault(Seq.empty)
+
   val TEST_ALTERNATIVE: OptionalConfigEntry[String] =
     buildConf("celeborn.test.alternative.key")
       .withAlternative("celeborn.test.alternative.deprecatedKey")
@@ -5638,8 +5666,8 @@ object CelebornConf extends Logging {
       .stringConf
       .toSequence
       .checkValue(
-        labels => labels.map(_ => 
Try(Utils.parseMetricLabels(_))).forall(_.isSuccess),
-        "Allowed pattern is: 
`<label1_key>:<label1_value>[,<label2_key>:<label2_value>]*`")
+        labels => labels.map(_ => 
Try(Utils.parseKeyValuePair(_))).forall(_.isSuccess),
+        "Allowed pattern is: 
`<label1_key>=<label1_value>[,<label2_key>=<label2_value>]*`")
       .createWithDefault(Seq.empty)
 
   val METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT: ConfigEntry[Int] =
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/client/ApplicationInfoProvider.scala
 
b/common/src/main/scala/org/apache/celeborn/common/client/ApplicationInfoProvider.scala
new file mode 100644
index 000000000..d2ccedbfe
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/client/ApplicationInfoProvider.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.util.Utils
+abstract class ApplicationInfoProvider(conf: CelebornConf) {
+  def provide(): Map[String, String]
+}
+
+object ApplicationInfoProvider {
+  def instantiate(conf: CelebornConf): ApplicationInfoProvider = {
+    val className = conf.clientApplicationInfoProvider
+    Utils.instantiateClassWithCelebornConf[ApplicationInfoProvider](className, 
conf)
+  }
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/client/DefaultApplicationInfoProvider.scala
 
b/common/src/main/scala/org/apache/celeborn/common/client/DefaultApplicationInfoProvider.scala
new file mode 100644
index 000000000..99e31d843
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/client/DefaultApplicationInfoProvider.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import org.apache.celeborn.common.CelebornConf
+
+class DefaultApplicationInfoProvider(conf: CelebornConf) extends 
ApplicationInfoProvider(conf) {
+  override def provide(): Map[String, String] = {
+    conf.clientApplicationInfoUserSpecific
+  }
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationInfo.scala
new file mode 100644
index 000000000..e685c7da4
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationInfo.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.meta
+
+import java.util.{Map => JMap}
+
+import org.apache.celeborn.common.identity.UserIdentifier
+
+/**
+ * Application info
+ */
+case class ApplicationInfo(
+    appId: String,
+    userIdentifier: UserIdentifier,
+    extraInfo: JMap[String, String],
+    registrationTime: Long)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 04c91e03b..c8146b9f2 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -18,7 +18,7 @@
 package org.apache.celeborn.common.protocol.message
 
 import java.util
-import java.util.{Collections, UUID}
+import java.util.{Collections, Map => JMap, UUID}
 
 import scala.collection.JavaConverters._
 
@@ -417,6 +417,13 @@ object ControlMessages extends Logging {
 
   case class ApplicationLostResponse(status: StatusCode) extends MasterMessage
 
+  case class RegisterApplicationInfo(
+      applicationId: String,
+      userIdentifier: UserIdentifier,
+      extraInfo: JMap[String, String],
+      override var requestId: String = ZERO_UUID)
+    extends MasterRequestMessage
+
   case class HeartbeatFromApplication(
       appId: String,
       totalWritten: Long,
@@ -843,6 +850,19 @@ object ControlMessages extends Logging {
         .setStatus(status.getValue).build().toByteArray
       new TransportMessage(MessageType.APPLICATION_LOST_RESPONSE, payload)
 
+    case RegisterApplicationInfo(
+          applicationId,
+          userIdentifier,
+          extraInfo,
+          requestId) =>
+      val payload = PbRegisterApplicationInfo.newBuilder()
+        .setAppId(applicationId)
+        .setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
+        .putAllExtraInfo(extraInfo)
+        .setRequestId(requestId)
+        .build().toByteArray
+      new TransportMessage(MessageType.REGISTER_APPLICATION_INFO, payload)
+
     case HeartbeatFromApplication(
           appId,
           totalWritten,
@@ -1477,6 +1497,13 @@ object ControlMessages extends Logging {
 
       case GET_STAGE_END_RESPONSE_VALUE =>
         PbGetStageEndResponse.parseFrom(message.getPayload)
+
+      case REGISTER_APPLICATION_INFO_VALUE =>
+        val pbRegisterApplicationInfo = 
PbRegisterApplicationInfo.parseFrom(message.getPayload)
+        RegisterApplicationInfo(
+          pbRegisterApplicationInfo.getAppId,
+          
PbSerDeUtils.fromPbUserIdentifier(pbRegisterApplicationInfo.getUserIdentifier),
+          pbRegisterApplicationInfo.getExtraInfoMap)
     }
   }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 7f51e158f..df0393a71 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 import com.google.protobuf.InvalidProtocolBufferException
 
 import org.apache.celeborn.common.identity.UserIdentifier
-import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, 
DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, 
WorkerInfo, WorkerStatus}
+import org.apache.celeborn.common.meta.{ApplicationInfo, ApplicationMeta, 
DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, 
WorkerEventInfo, WorkerInfo, WorkerStatus}
 import org.apache.celeborn.common.meta.MapFileMeta.SegmentIndex
 import org.apache.celeborn.common.protocol._
 import org.apache.celeborn.common.protocol.PartitionLocation.Mode
@@ -435,6 +435,7 @@ object PbSerDeUtils {
       shutdownWorkers: java.util.Set[WorkerInfo],
       workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo],
       applicationMetas: ConcurrentHashMap[String, ApplicationMeta],
+      applicationInfos: ConcurrentHashMap[String, ApplicationInfo],
       decommissionWorkers: java.util.Set[WorkerInfo]): PbSnapshotMetaInfo = {
     val builder = PbSnapshotMetaInfo.newBuilder()
       .setEstimatedPartitionSize(estimatedPartitionSize)
@@ -471,6 +472,14 @@ object PbSerDeUtils {
     if (localCollectionUtils.isNotEmpty(pbApplicationMetas)) {
       builder.putAllApplicationMetas(pbApplicationMetas)
     }
+
+    val pbApplicationInfos = applicationInfos.asScala.map {
+      case (appId, applicationInfo) => (appId, 
toPbApplicationInfo(applicationInfo))
+    }.asJava
+    if (localCollectionUtils.isNotEmpty(pbApplicationInfos)) {
+      builder.putAllApplicationInfos(pbApplicationInfos)
+    }
+
     builder.build()
   }
 
@@ -484,6 +493,23 @@ object PbSerDeUtils {
     ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret)
   }
 
+  def toPbApplicationInfo(applicationInfo: ApplicationInfo): PbApplicationInfo 
= {
+    PbApplicationInfo.newBuilder()
+      .setAppId(applicationInfo.appId)
+      .setUserIdentifier(toPbUserIdentifier(applicationInfo.userIdentifier))
+      .putAllExtraInfo(applicationInfo.extraInfo)
+      .setRegistrationTime(applicationInfo.registrationTime)
+      .build()
+  }
+
+  def fromPbApplicationInfo(pbApplicationInfo: PbApplicationInfo): 
ApplicationInfo = {
+    ApplicationInfo(
+      pbApplicationInfo.getAppId,
+      fromPbUserIdentifier(pbApplicationInfo.getUserIdentifier),
+      pbApplicationInfo.getExtraInfoMap,
+      pbApplicationInfo.getRegistrationTime)
+  }
+
   def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = {
     PbWorkerStatus.newBuilder()
       .setState(workerStatus.getState)
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 3fd090e77..7dec9f086 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1184,12 +1184,12 @@ object Utils extends Logging {
     }
   }
 
-  def parseMetricLabels(label: String): (String, String) = {
-    val labelPart = label.split("=")
-    if (labelPart.size != 2) {
-      throw new IllegalArgumentException(s"Illegal metric extra labels: 
$label")
+  def parseKeyValuePair(pair: String): (String, String) = {
+    val parts = pair.split("=")
+    if (parts.size != 2) {
+      throw new IllegalArgumentException(s"Illegal kay=value pair: $pair")
     }
-    labelPart(0).trim -> labelPart(1).trim
+    parts(0).trim -> parts(1).trim
   }
 
   def getProcessId: String = 
ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index 9d4059ff9..01ad15bbd 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.common.util
 
 import java.io.File
 import java.util
+import java.util.Collections
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -379,6 +380,18 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
     assert(restoredApplicationMeta.equals(applicationMeta))
   }
 
+  test("fromAndToPbApplicationInfo") {
+    val applicationInfo = new ApplicationInfo(
+      "app1",
+      UserIdentifier("tenant", "user"),
+      Collections.singletonMap("key", "value"),
+      System.currentTimeMillis())
+    val pbApplicationInfo = PbSerDeUtils.toPbApplicationInfo(applicationInfo)
+    val restoredApplicationInfo = 
PbSerDeUtils.fromPbApplicationInfo(pbApplicationInfo)
+
+    assert(restoredApplicationInfo.equals(applicationInfo))
+  }
+
   test("testPackedPartitionLocationPairCase1") {
     partitionLocation3.setPeer(partitionLocation2)
     val pairPb = PbSerDeUtils.toPbPackedPartitionLocationsPair(
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index ce75df174..2f2704209 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -21,6 +21,8 @@ license: |
 | --- | ------- | --------- | ----------- | ----- | ---------- |
 | celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled | false | false 
| If this is true, Celeborn will adaptively split skewed partitions instead of 
reading them by Spark map range. Please note that this feature requires the 
`Celeborn-Optimize-Skew-Partitions-spark3_3.patch`.  | 0.6.0 |  | 
 | celeborn.client.application.heartbeatInterval | 10s | false | Interval for 
client to send heartbeat message to master. | 0.3.0 | 
celeborn.application.heartbeatInterval | 
+| celeborn.client.application.info.provider | 
org.apache.celeborn.common.client.DefaultApplicationInfoProvider | false | 
ApplicationInfoProvider class name. Default class is 
`org.apache.celeborn.common.client.DefaultApplicationInfoProvider`. Optional 
values: org.apache.celeborn.common.identity.DefaultIdentityProvider user name 
and tenant id are default values or user-specific values. | 0.6.1 |  | 
+| celeborn.client.application.info.user-specific |  | false | User specific 
information for application registration, pattern is 
`<key1>=<value1>[,<key2>=<value2>]*`, e.g. `cluster=celeborn`. | 0.6.1 |  | 
 | celeborn.client.application.unregister.enabled | true | false | When true, 
Celeborn client will inform celeborn master the application is already shutdown 
during client exit, this allows the cluster to release resources immediately, 
resulting in resource savings. | 0.3.2 |  | 
 | celeborn.client.application.uuidSuffix.enabled | false | false | Whether to 
add UUID suffix for application id for unique. When `true`, add UUID suffix for 
unique application id. Currently, this only applies to Spark and MR. | 0.6.0 |  
| 
 | celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable 
chunk prefetch when creating CelebornInputStream. | 0.5.1 |  | 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index a56b8beb0..2ed05eef6 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.meta.ApplicationInfo;
 import org.apache.celeborn.common.meta.ApplicationMeta;
 import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.DiskStatus;
@@ -96,9 +97,17 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public final Map<String, Long> shuffleFallbackCounts = 
JavaUtils.newConcurrentHashMap();
   public final Map<String, Long> applicationFallbackCounts = 
JavaUtils.newConcurrentHashMap();
 
+  public final ConcurrentHashMap<String, ApplicationInfo> applicationInfos =
+      JavaUtils.newConcurrentHashMap();
   public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
       JavaUtils.newConcurrentHashMap();
 
+  public void updateApplicationInfo(
+      String appId, UserIdentifier userIdentifier, Map<String, String> 
extraInfo) {
+    applicationInfos.putIfAbsent(
+        appId, new ApplicationInfo(appId, userIdentifier, extraInfo, 
System.currentTimeMillis()));
+  }
+
   public void updateRequestSlotsMeta(
       String shuffleKey, String hostName, Map<String, Map<String, Integer>> 
workerWithAllocations) {
     Tuple2<String, Object> appIdShuffleId = Utils.splitShuffleKey(shuffleKey);
@@ -170,6 +179,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     registeredAppAndShuffles.remove(appId);
     appHeartbeatTime.remove(appId);
     applicationMetas.remove(appId);
+    applicationInfos.remove(appId);
   }
 
   @VisibleForTesting
@@ -380,6 +390,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
                 shutdownWorkers,
                 workerEventInfos,
                 applicationMetas,
+                applicationInfos,
                 decommissionWorkers)
             .toByteArray();
     Files.write(file.toPath(), snapshotBytes);
@@ -485,6 +496,11 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
           .forEach(
               (key, value) -> applicationMetas.put(key, 
PbSerDeUtils.fromPbApplicationMeta(value)));
 
+      snapshotMetaInfo
+          .getApplicationInfosMap()
+          .forEach(
+              (key, value) -> applicationInfos.put(key, 
PbSerDeUtils.fromPbApplicationInfo(value)));
+
       availableWorkers.addAll(
           workersMap.values().stream()
               .filter(worker -> isWorkerAvailable(worker))
@@ -524,6 +540,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     applicationFallbackCounts.clear();
     workerEventInfos.clear();
     applicationMetas.clear();
+    applicationInfos.clear();
   }
 
   public void updateMetaByReportWorkerUnavailable(List<WorkerInfo> 
failedWorkers) {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 34fd74e34..e3e063348 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -28,6 +28,9 @@ import org.apache.celeborn.common.meta.WorkerStatus;
 import org.apache.celeborn.common.quota.ResourceConsumption;
 
 public interface IMetadataHandler {
+  void handleRegisterApplicationInfo(
+      String appId, UserIdentifier userIdentifier, Map<String, String> 
extraInfo, String requestId);
+
   void handleRequestSlots(
       String shuffleKey,
       String hostName,
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 4d4fa3b1a..2e6d40b54 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -50,6 +50,15 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
     this.rackResolver = rackResolver;
   }
 
+  @Override
+  public void handleRegisterApplicationInfo(
+      String appId,
+      UserIdentifier userIdentifier,
+      Map<String, String> extraInfo,
+      String requestId) {
+    updateApplicationInfo(appId, userIdentifier, extraInfo);
+  }
+
   @Override
   public void handleRequestSlots(
       String shuffleKey,
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index e3319ffc1..7ac781da8 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -67,6 +67,34 @@ public class HAMasterMetaManager extends AbstractMetaManager 
{
     this.ratisServer = ratisServer;
   }
 
+  @Override
+  public void handleRegisterApplicationInfo(
+      String appId,
+      UserIdentifier userIdentifier,
+      Map<String, String> extraInfo,
+      String requestId) {
+    try {
+      ratisServer.submitRequest(
+          ResourceRequest.newBuilder()
+              .setCmdType(Type.RegisterApplicationInfo)
+              .setRequestId(requestId)
+              .setRegisterApplicationInfoRequest(
+                  ResourceProtos.RegisterApplicationInfoRequest.newBuilder()
+                      .setAppId(appId)
+                      .setUserIdentifier(
+                          ResourceProtos.UserIdentifier.newBuilder()
+                              .setTenantId(userIdentifier.tenantId())
+                              .setName(userIdentifier.name())
+                              .build())
+                      .putAllExtraInfo(extraInfo)
+                      .build())
+              .build());
+    } catch (CelebornRuntimeException e) {
+      LOG.error("Handle app lost for {} failed!", appId, e);
+      throw e;
+    }
+  }
+
   @Override
   public void handleRequestSlots(
       String shuffleKey,
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index abe21a4de..b30bf42d5 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -135,6 +135,18 @@ public class MetaHandler {
           LOG.debug("Handle batch unregister shuffle for {}", shuffleKeys);
           break;
 
+        case RegisterApplicationInfo:
+          appId = request.getRegisterApplicationInfoRequest().getAppId();
+          UserIdentifier userIdentifier =
+              new UserIdentifier(
+                  
request.getRegisterApplicationInfoRequest().getUserIdentifier().getTenantId(),
+                  
request.getRegisterApplicationInfoRequest().getUserIdentifier().getName());
+          Map<String, String> extraInfo =
+              request.getRegisterApplicationInfoRequest().getExtraInfoMap();
+          metaSystem.updateApplicationInfo(appId, userIdentifier, extraInfo);
+          LOG.debug("Handle register application info for {}/{}", appId, 
userIdentifier);
+          break;
+
         case AppHeartbeat:
           appId = request.getAppHeartbeatRequest().getAppId();
           long time = request.getAppHeartbeatRequest().getTime();
diff --git a/master/src/main/proto/Resource.proto 
b/master/src/main/proto/Resource.proto
index c8e9c2aee..938e57e4e 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -44,6 +44,8 @@ enum Type {
   BatchUnRegisterShuffle = 28;
 
   ReviseLostShuffles = 29;
+
+  RegisterApplicationInfo = 30;
 }
 
 enum WorkerEventType {
@@ -78,6 +80,7 @@ message ResourceRequest {
   optional ApplicationMetaRequest applicationMetaRequest = 23;
   optional ReportWorkerDecommissionRequest reportWorkerDecommissionRequest = 
24;
   optional BatchUnregisterShuffleRequest batchUnregisterShuffleRequest = 25;
+  optional RegisterApplicationInfoRequest registerApplicationInfoRequest = 26;
   optional ReviseLostShufflesRequest reviseLostShufflesRequest = 102;
 }
 
@@ -265,4 +268,10 @@ message ApplicationMetaRequest {
 message ReviseLostShufflesRequest {
   string appId = 1 ;
   repeated int32 lostShuffles = 2 ;
+}
+
+message RegisterApplicationInfoRequest {
+  string appId = 1;
+  UserIdentifier userIdentifier = 2;
+  map<string, string> extraInfo = 3;
 }
\ No newline at end of file
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 9a21a7bcd..908a2bbce 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master
 import java.io.IOException
 import java.net.BindException
 import java.util
+import java.util.{Map => JMap}
 import java.util.Collections
 import java.util.concurrent.{ExecutorService, ScheduledFuture, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
@@ -428,6 +429,13 @@ private[celeborn] class Master(
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
+    case RegisterApplicationInfo(appId, userIdentifier, extraInfo, requestId) 
=>
+      logDebug(
+        s"Received RegisterApplicationInfo request for app 
$appId/$userIdentifier/$extraInfo.")
+      checkAuth(context, appId)
+      executeWithLeaderChecker(
+        context,
+        handleRegisterApplicationInfo(context, appId, userIdentifier, 
extraInfo, requestId))
     case HeartbeatFromApplication(
           appId,
           totalWritten,
@@ -1161,6 +1169,16 @@ private[celeborn] class Master(
     }
   }
 
+  private def handleRegisterApplicationInfo(
+      context: RpcCallContext,
+      appId: String,
+      userIdentifier: UserIdentifier,
+      extraInfo: JMap[String, String],
+      requestId: String): Unit = {
+    statusSystem.handleRegisterApplicationInfo(appId, userIdentifier, 
extraInfo, requestId)
+    context.reply(OneWayMessageResponse)
+  }
+
   private def handleHeartbeatFromApplication(
       context: RpcCallContext,
       appId: String,
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
index f34e79159..ae7c18948 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.celeborn.service.deploy.master.http.api.v1
 
-import javax.ws.rs.{Consumes, DELETE, GET, Path, POST, Produces}
+import javax.ws.rs.{Consumes, GET, Path, POST, Produces}
 import javax.ws.rs.core.MediaType
 
 import scala.collection.JavaConverters._
@@ -27,7 +27,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema}
 import io.swagger.v3.oas.annotations.responses.ApiResponse
 import io.swagger.v3.oas.annotations.tags.Tag
 
-import org.apache.celeborn.rest.v1.model.{ApplicationHeartbeatData, 
ApplicationsHeartbeatResponse, DeleteAppsRequest, HandleResponse, 
HostnamesResponse, ReviseLostShufflesRequest}
+import org.apache.celeborn.rest.v1.model.{ApplicationHeartbeatData, 
ApplicationInfo, ApplicationInfoResponse, ApplicationsHeartbeatResponse, 
DeleteAppsRequest, HandleResponse, HostnamesResponse, ReviseLostShufflesRequest}
 import org.apache.celeborn.server.common.http.api.ApiRequestContext
 import org.apache.celeborn.service.deploy.master.Master
 
@@ -54,6 +54,26 @@ class ApplicationResource extends ApiRequestContext {
         }.toSeq.asJava)
   }
 
+  @Operation(description = "List all running application's info of the 
cluster.")
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON,
+      schema = new Schema(implementation = classOf[ApplicationInfoResponse]))))
+  @GET
+  @Path("/info")
+  def applicationsInfo(): ApplicationInfoResponse = {
+    new ApplicationInfoResponse()
+      .applications(
+        statusSystem.applicationInfos.asScala.map { case (appId, appInfo) =>
+          new ApplicationInfo()
+            .appId(appId)
+            .userIdentifier(appInfo.userIdentifier.toString)
+            .extraInfo(appInfo.extraInfo)
+            .registrationTime(appInfo.registrationTime)
+        }.toSeq.asJava)
+  }
+
   @Operation(description = "Delete resource of apps.")
   @ApiResponse(
     responseCode = "200",
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 4f4361ef0..242e3f9cf 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -958,4 +958,17 @@ public class DefaultMetaSystemSuiteJ {
     
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(), 
2);
     
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(), 
1);
   }
+
+  @Test
+  public void testRegisterApplicationInfo() {
+    statusSystem.applicationInfos.clear();
+    UserIdentifier userIdentifier = new UserIdentifier("tenant", "celeborn");
+
+    String appId = "app1";
+    Map<String, String> extraInfo = Collections.singletonMap("k1", "v1");
+    statusSystem.handleRegisterApplicationInfo(appId, userIdentifier, 
extraInfo, getNewReqeustId());
+
+    assertEquals(statusSystem.applicationInfos.get(appId).userIdentifier(), 
userIdentifier);
+    assertEquals(statusSystem.applicationInfos.get(appId).extraInfo(), 
extraInfo);
+  }
 }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index be644869f..5b7c395fa 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -1634,4 +1634,19 @@ public class RatisMasterStatusSystemSuiteJ {
     
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(), 
2);
     
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(), 
1);
   }
+
+  @Test
+  public void testRegisterApplicationInfo() {
+    AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+    Assert.assertNotNull(statusSystem);
+    statusSystem.applicationInfos.clear();
+    UserIdentifier userIdentifier = new UserIdentifier("tenant", "celeborn");
+
+    String appId = "app1";
+    Map<String, String> extraInfo = Collections.singletonMap("k1", "v1");
+    statusSystem.handleRegisterApplicationInfo(appId, userIdentifier, 
extraInfo, getNewReqeustId());
+
+    assertEquals(statusSystem.applicationInfos.get(appId).userIdentifier(), 
userIdentifier);
+    assertEquals(statusSystem.applicationInfos.get(appId).extraInfo(), 
extraInfo);
+  }
 }
diff --git 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
index 47ecf36d1..4c8e51d1c 100644
--- 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
+++ 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
@@ -25,6 +25,7 @@ import org.apache.celeborn.rest.v1.master.invoker.BaseApi;
 import org.apache.celeborn.rest.v1.master.invoker.Configuration;
 import org.apache.celeborn.rest.v1.master.invoker.Pair;
 
+import org.apache.celeborn.rest.v1.model.ApplicationInfoResponse;
 import org.apache.celeborn.rest.v1.model.ApplicationsHeartbeatResponse;
 import org.apache.celeborn.rest.v1.model.DeleteAppsRequest;
 import org.apache.celeborn.rest.v1.model.HandleResponse;
@@ -253,6 +254,73 @@ public class ApplicationApi extends BaseApi {
     );
   }
 
+  /**
+   * 
+   * List all running application&#39;s info of the cluster.
+   * @return ApplicationInfoResponse
+   * @throws ApiException if fails to make API call
+   */
+  public ApplicationInfoResponse getApplicationsInfo() throws ApiException {
+    return this.getApplicationsInfo(Collections.emptyMap());
+  }
+
+
+  /**
+   * 
+   * List all running application&#39;s info of the cluster.
+   * @param additionalHeaders additionalHeaders for this call
+   * @return ApplicationInfoResponse
+   * @throws ApiException if fails to make API call
+   */
+  public ApplicationInfoResponse getApplicationsInfo(Map<String, String> 
additionalHeaders) throws ApiException {
+    Object localVarPostBody = null;
+    
+    // create path and map variables
+    String localVarPath = "/api/v1/applications/info";
+
+    StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
+    String localVarQueryParameterBaseName;
+    List<Pair> localVarQueryParams = new ArrayList<Pair>();
+    List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
+    Map<String, String> localVarHeaderParams = new HashMap<String, String>();
+    Map<String, String> localVarCookieParams = new HashMap<String, String>();
+    Map<String, Object> localVarFormParams = new HashMap<String, Object>();
+
+    
+    localVarHeaderParams.putAll(additionalHeaders);
+
+    
+    
+    final String[] localVarAccepts = {
+      "application/json"
+    };
+    final String localVarAccept = 
apiClient.selectHeaderAccept(localVarAccepts);
+
+    final String[] localVarContentTypes = {
+      
+    };
+    final String localVarContentType = 
apiClient.selectHeaderContentType(localVarContentTypes);
+
+    String[] localVarAuthNames = new String[] { "basic" };
+
+    TypeReference<ApplicationInfoResponse> localVarReturnType = new 
TypeReference<ApplicationInfoResponse>() {};
+    return apiClient.invokeAPI(
+        localVarPath,
+        "GET",
+        localVarQueryParams,
+        localVarCollectionQueryParams,
+        localVarQueryStringJoiner.toString(),
+        localVarPostBody,
+        localVarHeaderParams,
+        localVarCookieParams,
+        localVarFormParams,
+        localVarAccept,
+        localVarContentType,
+        localVarAuthNames,
+        localVarReturnType
+    );
+  }
+
   /**
    * 
    * Revise lost shuffles or deleted shuffles of an application.
diff --git 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfo.java
 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfo.java
new file mode 100644
index 000000000..f81f32ed3
--- /dev/null
+++ 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfo.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.celeborn.rest.v1.model;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonValue;
+import java.util.HashMap;
+import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * ApplicationInfo
+ */
+@JsonPropertyOrder({
+  ApplicationInfo.JSON_PROPERTY_APP_ID,
+  ApplicationInfo.JSON_PROPERTY_USER_IDENTIFIER,
+  ApplicationInfo.JSON_PROPERTY_EXTRA_INFO,
+  ApplicationInfo.JSON_PROPERTY_REGISTRATION_TIME
+})
[email protected](value = 
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator 
version: 7.8.0")
+public class ApplicationInfo {
+  public static final String JSON_PROPERTY_APP_ID = "appId";
+  private String appId;
+
+  public static final String JSON_PROPERTY_USER_IDENTIFIER = "userIdentifier";
+  private String userIdentifier;
+
+  public static final String JSON_PROPERTY_EXTRA_INFO = "extraInfo";
+  private Map<String, String> extraInfo = new HashMap<>();
+
+  public static final String JSON_PROPERTY_REGISTRATION_TIME = 
"registrationTime";
+  private Long registrationTime;
+
+  public ApplicationInfo() {
+  }
+
+  public ApplicationInfo appId(String appId) {
+    
+    this.appId = appId;
+    return this;
+  }
+
+  /**
+   * The id of the application.
+   * @return appId
+   */
+  @javax.annotation.Nonnull
+  @JsonProperty(JSON_PROPERTY_APP_ID)
+  @JsonInclude(value = JsonInclude.Include.ALWAYS)
+
+  public String getAppId() {
+    return appId;
+  }
+
+
+  @JsonProperty(JSON_PROPERTY_APP_ID)
+  @JsonInclude(value = JsonInclude.Include.ALWAYS)
+  public void setAppId(String appId) {
+    this.appId = appId;
+  }
+
+  public ApplicationInfo userIdentifier(String userIdentifier) {
+    
+    this.userIdentifier = userIdentifier;
+    return this;
+  }
+
+  /**
+   * The user identifier of the application.
+   * @return userIdentifier
+   */
+  @javax.annotation.Nonnull
+  @JsonProperty(JSON_PROPERTY_USER_IDENTIFIER)
+  @JsonInclude(value = JsonInclude.Include.ALWAYS)
+
+  public String getUserIdentifier() {
+    return userIdentifier;
+  }
+
+
+  @JsonProperty(JSON_PROPERTY_USER_IDENTIFIER)
+  @JsonInclude(value = JsonInclude.Include.ALWAYS)
+  public void setUserIdentifier(String userIdentifier) {
+    this.userIdentifier = userIdentifier;
+  }
+
+  public ApplicationInfo extraInfo(Map<String, String> extraInfo) {
+    
+    this.extraInfo = extraInfo;
+    return this;
+  }
+
+  public ApplicationInfo putExtraInfoItem(String key, String extraInfoItem) {
+    if (this.extraInfo == null) {
+      this.extraInfo = new HashMap<>();
+    }
+    this.extraInfo.put(key, extraInfoItem);
+    return this;
+  }
+
+  /**
+   * Extra information of the application.
+   * @return extraInfo
+   */
+  @javax.annotation.Nullable
+  @JsonProperty(JSON_PROPERTY_EXTRA_INFO)
+  @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+  public Map<String, String> getExtraInfo() {
+    return extraInfo;
+  }
+
+
+  @JsonProperty(JSON_PROPERTY_EXTRA_INFO)
+  @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+  public void setExtraInfo(Map<String, String> extraInfo) {
+    this.extraInfo = extraInfo;
+  }
+
+  public ApplicationInfo registrationTime(Long registrationTime) {
+    
+    this.registrationTime = registrationTime;
+    return this;
+  }
+
+  /**
+   * The registration time of the application.
+   * @return registrationTime
+   */
+  @javax.annotation.Nullable
+  @JsonProperty(JSON_PROPERTY_REGISTRATION_TIME)
+  @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+  public Long getRegistrationTime() {
+    return registrationTime;
+  }
+
+
+  @JsonProperty(JSON_PROPERTY_REGISTRATION_TIME)
+  @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+  public void setRegistrationTime(Long registrationTime) {
+    this.registrationTime = registrationTime;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ApplicationInfo applicationInfo = (ApplicationInfo) o;
+    return Objects.equals(this.appId, applicationInfo.appId) &&
+        Objects.equals(this.userIdentifier, applicationInfo.userIdentifier) &&
+        Objects.equals(this.extraInfo, applicationInfo.extraInfo) &&
+        Objects.equals(this.registrationTime, 
applicationInfo.registrationTime);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(appId, userIdentifier, extraInfo, registrationTime);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class ApplicationInfo {\n");
+    sb.append("    appId: ").append(toIndentedString(appId)).append("\n");
+    sb.append("    userIdentifier: 
").append(toIndentedString(userIdentifier)).append("\n");
+    sb.append("    extraInfo: 
").append(toIndentedString(extraInfo)).append("\n");
+    sb.append("    registrationTime: 
").append(toIndentedString(registrationTime)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git 
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfoResponse.java
 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfoResponse.java
new file mode 100644
index 000000000..906cc1c03
--- /dev/null
+++ 
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfoResponse.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.celeborn.rest.v1.model;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonValue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.celeborn.rest.v1.model.ApplicationInfo;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * ApplicationInfoResponse
+ */
+@JsonPropertyOrder({
+  ApplicationInfoResponse.JSON_PROPERTY_APPLICATIONS
+})
[email protected](value = 
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator 
version: 7.8.0")
+public class ApplicationInfoResponse {
+  public static final String JSON_PROPERTY_APPLICATIONS = "applications";
+  private List<ApplicationInfo> applications = new ArrayList<>();
+
+  public ApplicationInfoResponse() {
+  }
+
+  public ApplicationInfoResponse applications(List<ApplicationInfo> 
applications) {
+    
+    this.applications = applications;
+    return this;
+  }
+
+  public ApplicationInfoResponse addApplicationsItem(ApplicationInfo 
applicationsItem) {
+    if (this.applications == null) {
+      this.applications = new ArrayList<>();
+    }
+    this.applications.add(applicationsItem);
+    return this;
+  }
+
+  /**
+   * The application information.
+   * @return applications
+   */
+  @javax.annotation.Nonnull
+  @JsonProperty(JSON_PROPERTY_APPLICATIONS)
+  @JsonInclude(value = JsonInclude.Include.ALWAYS)
+
+  public List<ApplicationInfo> getApplications() {
+    return applications;
+  }
+
+
+  @JsonProperty(JSON_PROPERTY_APPLICATIONS)
+  @JsonInclude(value = JsonInclude.Include.ALWAYS)
+  public void setApplications(List<ApplicationInfo> applications) {
+    this.applications = applications;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ApplicationInfoResponse applicationInfoResponse = 
(ApplicationInfoResponse) o;
+    return Objects.equals(this.applications, 
applicationInfoResponse.applications);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(applications);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class ApplicationInfoResponse {\n");
+    sb.append("    applications: 
").append(toIndentedString(applications)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml 
b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
index d2ca71a55..28c2968b9 100644
--- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
@@ -306,6 +306,20 @@ paths:
               schema:
                 $ref: '#/components/schemas/ApplicationsHeartbeatResponse'
 
+  /api/v1/applications/info:
+    get:
+      tags:
+        - Application
+      operationId: getApplicationsInfo
+      description: List all running application's info of the cluster.
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ApplicationInfoResponse'
+
   /api/v1/applications/delete_apps:
     post:
       tags:
@@ -772,6 +786,40 @@ components:
       required:
         - applications
 
+
+    ApplicationInfo:
+      type: object
+      properties:
+        appId:
+          type: string
+          description: The id of the application.
+        userIdentifier:
+          type: string
+          description: The user identifier of the application.
+        extraInfo:
+          type: object
+          description: Extra information of the application.
+          additionalProperties:
+            type: string
+        registrationTime:
+          type: integer
+          format: int64
+          description: The registration time of the application.
+      required:
+        - appId
+        - userIdentifier
+
+    ApplicationInfoResponse:
+      type: object
+      properties:
+        applications:
+          type: array
+          description: The application information.
+          items:
+            $ref: '#/components/schemas/ApplicationInfo'
+      required:
+        - applications
+
     HostnamesResponse:
       type: object
       properties:
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
index 87b05c766..d34c79419 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
@@ -19,16 +19,25 @@ package org.apache.celeborn.tests.client
 
 import java.util
 
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.Futures.{interval, timeout}
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
 import org.apache.celeborn.client.{LifecycleManager, WithShuffleClientSuite}
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.service.deploy.MiniClusterFeature
 
 class LifecycleManagerSuite extends WithShuffleClientSuite with 
MiniClusterFeature {
+  override protected val userIdentifier = UserIdentifier("test", "celeborn")
 
   celebornConf
     .set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true")
     .set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
+    .set(CelebornConf.USER_SPECIFIC_TENANT.key, userIdentifier.tenantId)
+    .set(CelebornConf.USER_SPECIFIC_USERNAME.key, userIdentifier.name)
+    .set(CelebornConf.USER_SPECIFIC_APPLICATION_INFO.key, "k1=v1")
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -99,6 +108,24 @@ class LifecycleManagerSuite extends WithShuffleClientSuite 
with MiniClusterFeatu
     lifecycleManager.stop()
   }
 
+  test("CELEBORN-1258: Support to register application info with user 
identifier and extra info") {
+    val lifecycleManager: LifecycleManager = new LifecycleManager(APP, 
celebornConf)
+
+    val arrayList = new util.ArrayList[Integer]()
+    (0 to 10).foreach(i => {
+      arrayList.add(i)
+    })
+
+    lifecycleManager.requestMasterRequestSlotsWithRetry(0, arrayList)
+
+    eventually(timeout(3.seconds), interval(0.milliseconds)) {
+      val appInfo = masterInfo._1.statusSystem.applicationInfos.get(APP)
+      assert(appInfo.userIdentifier == userIdentifier)
+      assert(appInfo.extraInfo.get("k1") == "v1")
+      assert(appInfo.registrationTime > 0 && appInfo.registrationTime < 
System.currentTimeMillis())
+    }
+  }
+
   override def afterAll(): Unit = {
     logInfo("all test complete , stop celeborn mini cluster")
     shutdownMiniCluster()

Reply via email to