This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d038dd2b3 [CELEBORN-1258] Support to register application info with
user identifier and extra info
d038dd2b3 is described below
commit d038dd2b32575e2cc12f6f3a1b7a7b8a15ceb544
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Sep 1 11:15:40 2025 +0800
[CELEBORN-1258] Support to register application info with user identifier
and extra info
### What changes were proposed in this pull request?
Support to register application info with user identifier and extra info.
### Why are the changes needed?
To provide more insight for the application information.
### Does this PR introduce _any_ user-facing change?
A new RESTful API.
### How was this patch tested?
UT.
Closes #3428 from turboFei/app_info_uid.
Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: Fei Wang <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../apache/celeborn/client/LifecycleManager.scala | 13 +-
common/src/main/proto/TransportMessages.proto | 20 +-
.../org/apache/celeborn/common/CelebornConf.scala | 34 +++-
.../common/client/ApplicationInfoProvider.scala | 31 +++
.../client/DefaultApplicationInfoProvider.scala | 26 +++
.../celeborn/common/meta/ApplicationInfo.scala | 31 +++
.../common/protocol/message/ControlMessages.scala | 30 ++-
.../apache/celeborn/common/util/PbSerDeUtils.scala | 28 ++-
.../org/apache/celeborn/common/util/Utils.scala | 10 +-
.../celeborn/common/util/PbSerDeUtilsTest.scala | 13 ++
docs/configuration/client.md | 2 +
.../master/clustermeta/AbstractMetaManager.java | 17 ++
.../master/clustermeta/IMetadataHandler.java | 3 +
.../clustermeta/SingleMasterMetaManager.java | 9 +
.../master/clustermeta/ha/HAMasterMetaManager.java | 28 +++
.../deploy/master/clustermeta/ha/MetaHandler.java | 12 ++
master/src/main/proto/Resource.proto | 9 +
.../celeborn/service/deploy/master/Master.scala | 18 ++
.../master/http/api/v1/ApplicationResource.scala | 24 ++-
.../clustermeta/DefaultMetaSystemSuiteJ.java | 13 ++
.../ha/RatisMasterStatusSystemSuiteJ.java | 15 ++
.../celeborn/rest/v1/master/ApplicationApi.java | 68 +++++++
.../celeborn/rest/v1/model/ApplicationInfo.java | 211 +++++++++++++++++++++
.../rest/v1/model/ApplicationInfoResponse.java | 120 ++++++++++++
.../src/main/openapi3/master_rest_v1.yaml | 48 +++++
.../tests/client/LifecycleManagerSuite.scala | 27 +++
26 files changed, 844 insertions(+), 16 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 14dfdafff..57349f130 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -21,7 +21,7 @@ import java.lang.{Byte => JByte}
import java.nio.ByteBuffer
import java.security.SecureRandom
import java.util
-import java.util.{function, Collections, List => JList}
+import java.util.{function, List => JList}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicInteger, LongAdder}
import java.util.function.{BiConsumer, BiFunction, Consumer}
@@ -42,7 +42,7 @@ import
org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, Shu
import org.apache.celeborn.client.listener.WorkerStatusListener
import org.apache.celeborn.common.{CelebornConf, CommitMetadata}
import org.apache.celeborn.common.CelebornConf.ACTIVE_STORAGE_TYPES
-import org.apache.celeborn.common.client.MasterClient
+import org.apache.celeborn.common.client.{ApplicationInfoProvider,
MasterClient}
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ApplicationMeta,
ShufflePartitionLocationInfo, WorkerInfo}
@@ -248,12 +248,21 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
private val messagesHelper: TransportMessagesHelper = new
TransportMessagesHelper()
+ private def registerApplicationInfo(): Unit = {
+ Utils.tryLogNonFatalError(
+ masterClient.send(RegisterApplicationInfo(
+ appUniqueId,
+ userIdentifier,
+ ApplicationInfoProvider.instantiate(conf).provide().asJava)))
+ }
+
// Since method `onStart` is executed when `rpcEnv.setupEndpoint` is
executed, and
// `masterClient` is initialized after `rpcEnv` is initialized, if method
`onStart` contains
// a reference to `masterClient`, there may be cases where `masterClient` is
null when
// `masterClient` is called. Therefore, it's necessary to uniformly execute
the initialization
// method at the end of the construction of the class to perform the
initialization operations.
private def initialize(): Unit = {
+ registerApplicationInfo()
// noinspection ConvertExpressionToSAM
commitManager.start()
heartbeater.start()
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 76ec5b457..be8af738a 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -116,6 +116,7 @@ enum MessageType {
GET_STAGE_END_RESPONSE = 93;
READ_REDUCER_PARTITION_END = 94;
READ_REDUCER_PARTITION_END_RESPONSE = 95;
+ REGISTER_APPLICATION_INFO = 96;
}
enum StreamType {
@@ -753,6 +754,7 @@ message PbSnapshotMetaInfo {
map<string, int64> shuffleFallbackCounts = 19;
int64 applicationTotalCount = 20;
map<string, int64> applicationFallbackCounts = 21;
+ map<string, PbApplicationInfo> applicationInfos = 22;
}
message PbOpenStream {
@@ -904,6 +906,13 @@ message PbApplicationMeta {
string secret = 2;
}
+message PbApplicationInfo {
+ string appId = 1;
+ PbUserIdentifier userIdentifier = 2;
+ map<string, string> extraInfo = 3;
+ int64 registrationTime = 4;
+}
+
message PbApplicationMetaRequest {
string appId = 1;
}
@@ -1005,6 +1014,7 @@ enum PbMetaRequestType {
ReportWorkerDecommission = 27;
BatchUnRegisterShuffle = 28;
ReviseLostShuffles = 29;
+ RegisterApplicationInfo = 30;
}
message PbMetaRequest {
@@ -1028,6 +1038,7 @@ message PbMetaRequest {
PbApplicationMeta applicationMetaRequest = 23;
PbReportWorkerDecommission reportWorkerDecommissionRequest = 24;
PbMetaBatchUnregisterShuffles batchUnregisterShuffleRequest = 25;
+ PbRegisterApplicationInfo registerApplicationInfoRequest = 26;
PbReviseLostShuffles reviseLostShufflesRequest = 102;
}
@@ -1074,4 +1085,11 @@ message PbMetaRequestResponse {
bool success = 2;
string message = 3;
PbMetaRequestStatus status = 4;
-}
\ No newline at end of file
+}
+
+message PbRegisterApplicationInfo {
+ string appId = 1;
+ PbUserIdentifier userIdentifier = 2;
+ map<string, string> extraInfo = 3;
+ string requestId = 4;
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 7379a19df..75c0dae22 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -30,6 +30,7 @@ import scala.util.matching.Regex
import io.netty.channel.epoll.Epoll
import
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
+import org.apache.celeborn.common.client.{ApplicationInfoProvider,
DefaultApplicationInfoProvider}
import org.apache.celeborn.common.identity.{DefaultIdentityProvider,
HadoopBasedIdentityProvider, IdentityProvider}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.internal.config._
@@ -901,7 +902,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def metricsCollectCriticalEnabled: Boolean =
get(METRICS_COLLECT_CRITICAL_ENABLED)
def metricsCapacity: Int = get(METRICS_CAPACITY)
def metricsExtraLabels: Map[String, String] =
- get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap
+ get(METRICS_EXTRA_LABELS).map(Utils.parseKeyValuePair).toMap
def metricsWorkerAppTopResourceConsumptionCount: Int =
get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT)
def metricsWorkerAppTopResourceConsumptionBytesWrittenThreshold: Long =
@@ -954,6 +955,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
def clientApplicationUUIDSuffixEnabled: Boolean =
get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED)
+ def clientApplicationInfoProvider: String =
get(CLIENT_APPLICATION_INFO_PROVIDER)
+ def clientApplicationInfoUserSpecific: Map[String, String] =
+ get(USER_SPECIFIC_APPLICATION_INFO).map(Utils.parseKeyValuePair).toMap
def clientShuffleIntegrityCheckEnabled: Boolean =
get(CLIENT_SHUFFLE_INTEGRITY_CHECK_ENABLED)
@@ -5633,6 +5637,30 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
+ val CLIENT_APPLICATION_INFO_PROVIDER: ConfigEntry[String] =
+ buildConf("celeborn.client.application.info.provider")
+ .categories("client")
+ .doc(s"ApplicationInfoProvider class name. Default class is " +
+ s"`${classOf[DefaultApplicationInfoProvider].getName}`. " +
+ s"Optional values: " +
+ s"${classOf[DefaultIdentityProvider].getName} user name and tenant id
are default values or user-specific values.")
+ .version("0.6.1")
+ .stringConf
+ .createWithDefault(classOf[DefaultApplicationInfoProvider].getName)
+
+ val USER_SPECIFIC_APPLICATION_INFO: ConfigEntry[Seq[String]] =
+ buildConf("celeborn.client.application.info.user-specific")
+ .categories("client")
+ .version("0.6.1")
+ .doc("User specific information for application registration, pattern
is" +
+ " `<key1>=<value1>[,<key2>=<value2>]*`, e.g. `cluster=celeborn`.")
+ .stringConf
+ .toSequence
+ .checkValue(
+ pairs => pairs.map(_ =>
Try(Utils.parseKeyValuePair(_))).forall(_.isSuccess),
+ "Allowed pattern is: `<key1>=<value1>[,<key2>=<value2>]*`")
+ .createWithDefault(Seq.empty)
+
val TEST_ALTERNATIVE: OptionalConfigEntry[String] =
buildConf("celeborn.test.alternative.key")
.withAlternative("celeborn.test.alternative.deprecatedKey")
@@ -5700,8 +5728,8 @@ object CelebornConf extends Logging {
.stringConf
.toSequence
.checkValue(
- labels => labels.map(_ =>
Try(Utils.parseMetricLabels(_))).forall(_.isSuccess),
- "Allowed pattern is:
`<label1_key>:<label1_value>[,<label2_key>:<label2_value>]*`")
+ labels => labels.map(_ =>
Try(Utils.parseKeyValuePair(_))).forall(_.isSuccess),
+ "Allowed pattern is:
`<label1_key>=<label1_value>[,<label2_key>=<label2_value>]*`")
.createWithDefault(Seq.empty)
val METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT: ConfigEntry[Int] =
diff --git
a/common/src/main/scala/org/apache/celeborn/common/client/ApplicationInfoProvider.scala
b/common/src/main/scala/org/apache/celeborn/common/client/ApplicationInfoProvider.scala
new file mode 100644
index 000000000..d2ccedbfe
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/client/ApplicationInfoProvider.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.util.Utils
+abstract class ApplicationInfoProvider(conf: CelebornConf) {
+ def provide(): Map[String, String]
+}
+
+object ApplicationInfoProvider {
+ def instantiate(conf: CelebornConf): ApplicationInfoProvider = {
+ val className = conf.clientApplicationInfoProvider
+ Utils.instantiateClassWithCelebornConf[ApplicationInfoProvider](className,
conf)
+ }
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/client/DefaultApplicationInfoProvider.scala
b/common/src/main/scala/org/apache/celeborn/common/client/DefaultApplicationInfoProvider.scala
new file mode 100644
index 000000000..99e31d843
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/client/DefaultApplicationInfoProvider.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.client
+
+import org.apache.celeborn.common.CelebornConf
+
+class DefaultApplicationInfoProvider(conf: CelebornConf) extends
ApplicationInfoProvider(conf) {
+ override def provide(): Map[String, String] = {
+ conf.clientApplicationInfoUserSpecific
+ }
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationInfo.scala
new file mode 100644
index 000000000..e685c7da4
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationInfo.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.meta
+
+import java.util.{Map => JMap}
+
+import org.apache.celeborn.common.identity.UserIdentifier
+
+/**
+ * Application info
+ */
+case class ApplicationInfo(
+ appId: String,
+ userIdentifier: UserIdentifier,
+ extraInfo: JMap[String, String],
+ registrationTime: Long)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 7fcb0fd2f..0640e7e47 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -18,8 +18,7 @@
package org.apache.celeborn.common.protocol.message
import java.util
-import java.util.{Collections, UUID}
-import java.util.concurrent.atomic.AtomicIntegerArray
+import java.util.{Collections, Map => JMap, UUID}
import scala.collection.JavaConverters._
@@ -433,6 +432,13 @@ object ControlMessages extends Logging {
case class ApplicationLostResponse(status: StatusCode) extends MasterMessage
+ case class RegisterApplicationInfo(
+ applicationId: String,
+ userIdentifier: UserIdentifier,
+ extraInfo: JMap[String, String],
+ override var requestId: String = ZERO_UUID)
+ extends MasterRequestMessage
+
case class HeartbeatFromApplication(
appId: String,
totalWritten: Long,
@@ -878,6 +884,19 @@ object ControlMessages extends Logging {
.setStatus(status.getValue).build().toByteArray
new TransportMessage(MessageType.APPLICATION_LOST_RESPONSE, payload)
+ case RegisterApplicationInfo(
+ applicationId,
+ userIdentifier,
+ extraInfo,
+ requestId) =>
+ val payload = PbRegisterApplicationInfo.newBuilder()
+ .setAppId(applicationId)
+ .setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
+ .putAllExtraInfo(extraInfo)
+ .setRequestId(requestId)
+ .build().toByteArray
+ new TransportMessage(MessageType.REGISTER_APPLICATION_INFO, payload)
+
case HeartbeatFromApplication(
appId,
totalWritten,
@@ -1537,6 +1556,13 @@ object ControlMessages extends Logging {
case GET_STAGE_END_RESPONSE_VALUE =>
PbGetStageEndResponse.parseFrom(message.getPayload)
+
+ case REGISTER_APPLICATION_INFO_VALUE =>
+ val pbRegisterApplicationInfo =
PbRegisterApplicationInfo.parseFrom(message.getPayload)
+ RegisterApplicationInfo(
+ pbRegisterApplicationInfo.getAppId,
+
PbSerDeUtils.fromPbUserIdentifier(pbRegisterApplicationInfo.getUserIdentifier),
+ pbRegisterApplicationInfo.getExtraInfoMap)
}
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 6b2c9bde7..b74a31fdf 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import com.google.protobuf.InvalidProtocolBufferException
import org.apache.celeborn.common.identity.UserIdentifier
-import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo,
DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo,
WorkerInfo, WorkerStatus}
+import org.apache.celeborn.common.meta.{ApplicationInfo, ApplicationMeta,
DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta,
WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.MapFileMeta.SegmentIndex
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.PartitionLocation.Mode
@@ -442,6 +442,7 @@ object PbSerDeUtils {
shutdownWorkers: java.util.Set[WorkerInfo],
workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo],
applicationMetas: ConcurrentHashMap[String, ApplicationMeta],
+ applicationInfos: ConcurrentHashMap[String, ApplicationInfo],
decommissionWorkers: java.util.Set[WorkerInfo]): PbSnapshotMetaInfo = {
val builder = PbSnapshotMetaInfo.newBuilder()
.setEstimatedPartitionSize(estimatedPartitionSize)
@@ -478,6 +479,14 @@ object PbSerDeUtils {
if (localCollectionUtils.isNotEmpty(pbApplicationMetas)) {
builder.putAllApplicationMetas(pbApplicationMetas)
}
+
+ val pbApplicationInfos = applicationInfos.asScala.map {
+ case (appId, applicationInfo) => (appId,
toPbApplicationInfo(applicationInfo))
+ }.asJava
+ if (localCollectionUtils.isNotEmpty(pbApplicationInfos)) {
+ builder.putAllApplicationInfos(pbApplicationInfos)
+ }
+
builder.build()
}
@@ -491,6 +500,23 @@ object PbSerDeUtils {
ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret)
}
+ def toPbApplicationInfo(applicationInfo: ApplicationInfo): PbApplicationInfo
= {
+ PbApplicationInfo.newBuilder()
+ .setAppId(applicationInfo.appId)
+ .setUserIdentifier(toPbUserIdentifier(applicationInfo.userIdentifier))
+ .putAllExtraInfo(applicationInfo.extraInfo)
+ .setRegistrationTime(applicationInfo.registrationTime)
+ .build()
+ }
+
+ def fromPbApplicationInfo(pbApplicationInfo: PbApplicationInfo):
ApplicationInfo = {
+ ApplicationInfo(
+ pbApplicationInfo.getAppId,
+ fromPbUserIdentifier(pbApplicationInfo.getUserIdentifier),
+ pbApplicationInfo.getExtraInfoMap,
+ pbApplicationInfo.getRegistrationTime)
+ }
+
def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = {
PbWorkerStatus.newBuilder()
.setState(workerStatus.getState)
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index aa719cff1..cddcfec40 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1184,12 +1184,12 @@ object Utils extends Logging {
}
}
- def parseMetricLabels(label: String): (String, String) = {
- val labelPart = label.split("=")
- if (labelPart.size != 2) {
- throw new IllegalArgumentException(s"Illegal metric extra labels:
$label")
+ def parseKeyValuePair(pair: String): (String, String) = {
+ val parts = pair.split("=")
+ if (parts.size != 2) {
+ throw new IllegalArgumentException(s"Illegal kay=value pair: $pair")
}
- labelPart(0).trim -> labelPart(1).trim
+ parts(0).trim -> parts(1).trim
}
def getProcessId: String =
ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index e8252c4d2..fa2ebbfbb 100644
---
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.common.util
import java.io.File
import java.util
+import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -380,6 +381,18 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
assert(restoredApplicationMeta.equals(applicationMeta))
}
+ test("fromAndToPbApplicationInfo") {
+ val applicationInfo = new ApplicationInfo(
+ "app1",
+ UserIdentifier("tenant", "user"),
+ Collections.singletonMap("key", "value"),
+ System.currentTimeMillis())
+ val pbApplicationInfo = PbSerDeUtils.toPbApplicationInfo(applicationInfo)
+ val restoredApplicationInfo =
PbSerDeUtils.fromPbApplicationInfo(pbApplicationInfo)
+
+ assert(restoredApplicationInfo.equals(applicationInfo))
+ }
+
test("testPackedPartitionLocationPairCase1") {
partitionLocation3.setPeer(partitionLocation2)
val pairPb = PbSerDeUtils.toPbPackedPartitionLocationsPair(
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 8e640d0f3..8fd4ef30a 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -21,6 +21,8 @@ license: |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled | false | false
| If this is true, Celeborn will adaptively split skewed partitions instead of
reading them by Spark map range. Please note that this feature requires the
`Celeborn-Optimize-Skew-Partitions-spark3_3.patch`. | 0.6.0 | |
| celeborn.client.application.heartbeatInterval | 10s | false | Interval for
client to send heartbeat message to master. | 0.3.0 |
celeborn.application.heartbeatInterval |
+| celeborn.client.application.info.provider |
org.apache.celeborn.common.client.DefaultApplicationInfoProvider | false |
ApplicationInfoProvider class name. Default class is
`org.apache.celeborn.common.client.DefaultApplicationInfoProvider`. Optional
values: org.apache.celeborn.common.identity.DefaultIdentityProvider user name
and tenant id are default values or user-specific values. | 0.6.1 | |
+| celeborn.client.application.info.user-specific | | false | User specific
information for application registration, pattern is
`<key1>=<value1>[,<key2>=<value2>]*`, e.g. `cluster=celeborn`. | 0.6.1 | |
| celeborn.client.application.unregister.enabled | true | false | When true,
Celeborn client will inform celeborn master the application is already shutdown
during client exit, this allows the cluster to release resources immediately,
resulting in resource savings. | 0.3.2 | |
| celeborn.client.application.uuidSuffix.enabled | false | false | Whether to
add UUID suffix for application id for unique. When `true`, add UUID suffix for
unique application id. Currently, this only applies to Spark and MR. | 0.6.0 |
|
| celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable
chunk prefetch when creating CelebornInputStream. | 0.5.1 | |
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 4ae879743..ae639d2e5 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.meta.ApplicationInfo;
import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.DiskStatus;
@@ -101,9 +102,17 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public final Map<String, Long> shuffleFallbackCounts =
JavaUtils.newConcurrentHashMap();
public final Map<String, Long> applicationFallbackCounts =
JavaUtils.newConcurrentHashMap();
+ public final ConcurrentHashMap<String, ApplicationInfo> applicationInfos =
+ JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
JavaUtils.newConcurrentHashMap();
+ public void updateApplicationInfo(
+ String appId, UserIdentifier userIdentifier, Map<String, String>
extraInfo) {
+ applicationInfos.putIfAbsent(
+ appId, new ApplicationInfo(appId, userIdentifier, extraInfo,
System.currentTimeMillis()));
+ }
+
public void updateRequestSlotsMeta(
String shuffleKey, String hostName, Map<String, Map<String, Integer>>
workerWithAllocations) {
Tuple2<String, Object> appIdShuffleId = Utils.splitShuffleKey(shuffleKey);
@@ -175,6 +184,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
registeredAppAndShuffles.remove(appId);
appHeartbeatTime.remove(appId);
applicationMetas.remove(appId);
+ applicationInfos.remove(appId);
}
@VisibleForTesting
@@ -418,6 +428,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
shutdownWorkers,
workerEventInfos,
applicationMetas,
+ applicationInfos,
decommissionWorkers)
.toByteArray();
Files.write(file.toPath(), snapshotBytes);
@@ -523,6 +534,11 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
.forEach(
(key, value) -> applicationMetas.put(key,
PbSerDeUtils.fromPbApplicationMeta(value)));
+ snapshotMetaInfo
+ .getApplicationInfosMap()
+ .forEach(
+ (key, value) -> applicationInfos.put(key,
PbSerDeUtils.fromPbApplicationInfo(value)));
+
availableWorkers.addAll(
workersMap.values().stream()
.filter(worker -> isWorkerAvailable(worker))
@@ -562,6 +578,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
applicationFallbackCounts.clear();
workerEventInfos.clear();
applicationMetas.clear();
+ applicationInfos.clear();
}
public void updateMetaByReportWorkerUnavailable(List<WorkerInfo>
failedWorkers) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 34fd74e34..e3e063348 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -28,6 +28,9 @@ import org.apache.celeborn.common.meta.WorkerStatus;
import org.apache.celeborn.common.quota.ResourceConsumption;
public interface IMetadataHandler {
+ void handleRegisterApplicationInfo(
+ String appId, UserIdentifier userIdentifier, Map<String, String>
extraInfo, String requestId);
+
void handleRequestSlots(
String shuffleKey,
String hostName,
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 1a0beeffe..c4b2e843f 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -54,6 +54,15 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
this.rackResolver = rackResolver;
}
+ @Override
+ public void handleRegisterApplicationInfo(
+ String appId,
+ UserIdentifier userIdentifier,
+ Map<String, String> extraInfo,
+ String requestId) {
+ updateApplicationInfo(appId, userIdentifier, extraInfo);
+ }
+
@Override
public void handleRequestSlots(
String shuffleKey,
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 72f531d95..3372143aa 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -70,6 +70,34 @@ public class HAMasterMetaManager extends AbstractMetaManager
{
this.ratisServer = ratisServer;
}
+ @Override
+ public void handleRegisterApplicationInfo(
+ String appId,
+ UserIdentifier userIdentifier,
+ Map<String, String> extraInfo,
+ String requestId) {
+ try {
+ ratisServer.submitRequest(
+ ResourceRequest.newBuilder()
+ .setCmdType(Type.RegisterApplicationInfo)
+ .setRequestId(requestId)
+ .setRegisterApplicationInfoRequest(
+ ResourceProtos.RegisterApplicationInfoRequest.newBuilder()
+ .setAppId(appId)
+ .setUserIdentifier(
+ ResourceProtos.UserIdentifier.newBuilder()
+ .setTenantId(userIdentifier.tenantId())
+ .setName(userIdentifier.name())
+ .build())
+ .putAllExtraInfo(extraInfo)
+ .build())
+ .build());
+ } catch (CelebornRuntimeException e) {
+ LOG.error("Handle app lost for {} failed!", appId, e);
+ throw e;
+ }
+ }
+
@Override
public void handleRequestSlots(
String shuffleKey,
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 8cbf4de95..fa18ce925 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -146,6 +146,18 @@ public class MetaHandler {
LOG.debug("Handle batch unregister shuffle for {}", shuffleKeys);
break;
+ case RegisterApplicationInfo:
+ appId = request.getRegisterApplicationInfoRequest().getAppId();
+ UserIdentifier userIdentifier =
+ new UserIdentifier(
+
request.getRegisterApplicationInfoRequest().getUserIdentifier().getTenantId(),
+
request.getRegisterApplicationInfoRequest().getUserIdentifier().getName());
+ Map<String, String> extraInfo =
+ request.getRegisterApplicationInfoRequest().getExtraInfoMap();
+ metaSystem.updateApplicationInfo(appId, userIdentifier, extraInfo);
+ LOG.debug("Handle register application info for {}/{}", appId,
userIdentifier);
+ break;
+
case AppHeartbeat:
appId = request.getAppHeartbeatRequest().getAppId();
long time = request.getAppHeartbeatRequest().getTime();
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index e3c688aa1..a10a368fa 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -44,6 +44,8 @@ enum Type {
BatchUnRegisterShuffle = 28;
ReviseLostShuffles = 29;
+
+ RegisterApplicationInfo = 30;
}
enum WorkerEventType {
@@ -78,6 +80,7 @@ message ResourceRequest {
optional ApplicationMetaRequest applicationMetaRequest = 23;
optional ReportWorkerDecommissionRequest reportWorkerDecommissionRequest =
24;
optional BatchUnregisterShuffleRequest batchUnregisterShuffleRequest = 25;
+ optional RegisterApplicationInfoRequest registerApplicationInfoRequest = 26;
optional ReviseLostShufflesRequest reviseLostShufflesRequest = 102;
}
@@ -265,4 +268,10 @@ message ApplicationMetaRequest {
message ReviseLostShufflesRequest {
string appId = 1 ;
repeated int32 lostShuffles = 2 ;
+}
+
+message RegisterApplicationInfoRequest {
+ string appId = 1;
+ UserIdentifier userIdentifier = 2;
+ map<string, string> extraInfo = 3;
}
\ No newline at end of file
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index f4caae517..c5e31defb 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master
import java.io.IOException
import java.net.BindException
import java.util
+import java.util.{Map => JMap}
import java.util.Collections
import java.util.concurrent.{ExecutorService, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
@@ -429,6 +430,13 @@ private[celeborn] class Master(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
Unit] = {
+ case RegisterApplicationInfo(appId, userIdentifier, extraInfo, requestId)
=>
+ logDebug(
+ s"Received RegisterApplicationInfo request for app
$appId/$userIdentifier/$extraInfo.")
+ checkAuth(context, appId)
+ executeWithLeaderChecker(
+ context,
+ handleRegisterApplicationInfo(context, appId, userIdentifier,
extraInfo, requestId))
case HeartbeatFromApplication(
appId,
totalWritten,
@@ -1166,6 +1174,16 @@ private[celeborn] class Master(
}
}
+ private def handleRegisterApplicationInfo(
+ context: RpcCallContext,
+ appId: String,
+ userIdentifier: UserIdentifier,
+ extraInfo: JMap[String, String],
+ requestId: String): Unit = {
+ statusSystem.handleRegisterApplicationInfo(appId, userIdentifier,
extraInfo, requestId)
+ context.reply(OneWayMessageResponse)
+ }
+
private def handleHeartbeatFromApplication(
context: RpcCallContext,
appId: String,
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
index f34e79159..ae7c18948 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala
@@ -17,7 +17,7 @@
package org.apache.celeborn.service.deploy.master.http.api.v1
-import javax.ws.rs.{Consumes, DELETE, GET, Path, POST, Produces}
+import javax.ws.rs.{Consumes, GET, Path, POST, Produces}
import javax.ws.rs.core.MediaType
import scala.collection.JavaConverters._
@@ -27,7 +27,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
-import org.apache.celeborn.rest.v1.model.{ApplicationHeartbeatData,
ApplicationsHeartbeatResponse, DeleteAppsRequest, HandleResponse,
HostnamesResponse, ReviseLostShufflesRequest}
+import org.apache.celeborn.rest.v1.model.{ApplicationHeartbeatData,
ApplicationInfo, ApplicationInfoResponse, ApplicationsHeartbeatResponse,
DeleteAppsRequest, HandleResponse, HostnamesResponse, ReviseLostShufflesRequest}
import org.apache.celeborn.server.common.http.api.ApiRequestContext
import org.apache.celeborn.service.deploy.master.Master
@@ -54,6 +54,26 @@ class ApplicationResource extends ApiRequestContext {
}.toSeq.asJava)
}
+ @Operation(description = "List all running application's info of the
cluster.")
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON,
+ schema = new Schema(implementation = classOf[ApplicationInfoResponse]))))
+ @GET
+ @Path("/info")
+ def applicationsInfo(): ApplicationInfoResponse = {
+ new ApplicationInfoResponse()
+ .applications(
+ statusSystem.applicationInfos.asScala.map { case (appId, appInfo) =>
+ new ApplicationInfo()
+ .appId(appId)
+ .userIdentifier(appInfo.userIdentifier.toString)
+ .extraInfo(appInfo.extraInfo)
+ .registrationTime(appInfo.registrationTime)
+ }.toSeq.asJava)
+ }
+
@Operation(description = "Delete resource of apps.")
@ApiResponse(
responseCode = "200",
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 04a984cbc..898e7686e 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -1134,4 +1134,17 @@ public class DefaultMetaSystemSuiteJ {
assertEquals(2,
statusSystem.applicationFallbackCounts.get(POLICY1).longValue());
assertEquals(1,
statusSystem.applicationFallbackCounts.get(POLICY2).longValue());
}
+
+ @Test
+ public void testRegisterApplicationInfo() {
+ statusSystem.applicationInfos.clear();
+ UserIdentifier userIdentifier = new UserIdentifier("tenant", "celeborn");
+
+ String appId = "app1";
+ Map<String, String> extraInfo = Collections.singletonMap("k1", "v1");
+ statusSystem.handleRegisterApplicationInfo(appId, userIdentifier,
extraInfo, getNewReqeustId());
+
+ assertEquals(statusSystem.applicationInfos.get(appId).userIdentifier(),
userIdentifier);
+ assertEquals(statusSystem.applicationInfos.get(appId).extraInfo(),
extraInfo);
+ }
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 952789a32..3e8d8ab60 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -1985,4 +1985,19 @@ public class RatisMasterStatusSystemSuiteJ {
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(),
2);
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(),
1);
}
+
+ @Test
+ public void testRegisterApplicationInfo() {
+ AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+ Assert.assertNotNull(statusSystem);
+ statusSystem.applicationInfos.clear();
+ UserIdentifier userIdentifier = new UserIdentifier("tenant", "celeborn");
+
+ String appId = "app1";
+ Map<String, String> extraInfo = Collections.singletonMap("k1", "v1");
+ statusSystem.handleRegisterApplicationInfo(appId, userIdentifier,
extraInfo, getNewReqeustId());
+
+ assertEquals(statusSystem.applicationInfos.get(appId).userIdentifier(),
userIdentifier);
+ assertEquals(statusSystem.applicationInfos.get(appId).extraInfo(),
extraInfo);
+ }
}
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
index 47ecf36d1..4c8e51d1c 100644
---
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java
@@ -25,6 +25,7 @@ import org.apache.celeborn.rest.v1.master.invoker.BaseApi;
import org.apache.celeborn.rest.v1.master.invoker.Configuration;
import org.apache.celeborn.rest.v1.master.invoker.Pair;
+import org.apache.celeborn.rest.v1.model.ApplicationInfoResponse;
import org.apache.celeborn.rest.v1.model.ApplicationsHeartbeatResponse;
import org.apache.celeborn.rest.v1.model.DeleteAppsRequest;
import org.apache.celeborn.rest.v1.model.HandleResponse;
@@ -253,6 +254,73 @@ public class ApplicationApi extends BaseApi {
);
}
+ /**
+ *
+ * List all running application's info of the cluster.
+ * @return ApplicationInfoResponse
+ * @throws ApiException if fails to make API call
+ */
+ public ApplicationInfoResponse getApplicationsInfo() throws ApiException {
+ return this.getApplicationsInfo(Collections.emptyMap());
+ }
+
+
+ /**
+ *
+ * List all running application's info of the cluster.
+ * @param additionalHeaders additionalHeaders for this call
+ * @return ApplicationInfoResponse
+ * @throws ApiException if fails to make API call
+ */
+ public ApplicationInfoResponse getApplicationsInfo(Map<String, String>
additionalHeaders) throws ApiException {
+ Object localVarPostBody = null;
+
+ // create path and map variables
+ String localVarPath = "/api/v1/applications/info";
+
+ StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
+ String localVarQueryParameterBaseName;
+ List<Pair> localVarQueryParams = new ArrayList<Pair>();
+ List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
+ Map<String, String> localVarHeaderParams = new HashMap<String, String>();
+ Map<String, String> localVarCookieParams = new HashMap<String, String>();
+ Map<String, Object> localVarFormParams = new HashMap<String, Object>();
+
+
+ localVarHeaderParams.putAll(additionalHeaders);
+
+
+
+ final String[] localVarAccepts = {
+ "application/json"
+ };
+ final String localVarAccept =
apiClient.selectHeaderAccept(localVarAccepts);
+
+ final String[] localVarContentTypes = {
+
+ };
+ final String localVarContentType =
apiClient.selectHeaderContentType(localVarContentTypes);
+
+ String[] localVarAuthNames = new String[] { "basic" };
+
+ TypeReference<ApplicationInfoResponse> localVarReturnType = new
TypeReference<ApplicationInfoResponse>() {};
+ return apiClient.invokeAPI(
+ localVarPath,
+ "GET",
+ localVarQueryParams,
+ localVarCollectionQueryParams,
+ localVarQueryStringJoiner.toString(),
+ localVarPostBody,
+ localVarHeaderParams,
+ localVarCookieParams,
+ localVarFormParams,
+ localVarAccept,
+ localVarContentType,
+ localVarAuthNames,
+ localVarReturnType
+ );
+ }
+
/**
*
* Revise lost shuffles or deleted shuffles of an application.
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfo.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfo.java
new file mode 100644
index 000000000..f81f32ed3
--- /dev/null
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfo.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.celeborn.rest.v1.model;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonValue;
+import java.util.HashMap;
+import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * ApplicationInfo
+ */
+@JsonPropertyOrder({
+ ApplicationInfo.JSON_PROPERTY_APP_ID,
+ ApplicationInfo.JSON_PROPERTY_USER_IDENTIFIER,
+ ApplicationInfo.JSON_PROPERTY_EXTRA_INFO,
+ ApplicationInfo.JSON_PROPERTY_REGISTRATION_TIME
+})
[email protected](value =
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator
version: 7.8.0")
+public class ApplicationInfo {
+ public static final String JSON_PROPERTY_APP_ID = "appId";
+ private String appId;
+
+ public static final String JSON_PROPERTY_USER_IDENTIFIER = "userIdentifier";
+ private String userIdentifier;
+
+ public static final String JSON_PROPERTY_EXTRA_INFO = "extraInfo";
+ private Map<String, String> extraInfo = new HashMap<>();
+
+ public static final String JSON_PROPERTY_REGISTRATION_TIME =
"registrationTime";
+ private Long registrationTime;
+
+ public ApplicationInfo() {
+ }
+
+ public ApplicationInfo appId(String appId) {
+
+ this.appId = appId;
+ return this;
+ }
+
+ /**
+ * The id of the application.
+ * @return appId
+ */
+ @javax.annotation.Nonnull
+ @JsonProperty(JSON_PROPERTY_APP_ID)
+ @JsonInclude(value = JsonInclude.Include.ALWAYS)
+
+ public String getAppId() {
+ return appId;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_APP_ID)
+ @JsonInclude(value = JsonInclude.Include.ALWAYS)
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ public ApplicationInfo userIdentifier(String userIdentifier) {
+
+ this.userIdentifier = userIdentifier;
+ return this;
+ }
+
+ /**
+ * The user identifier of the application.
+ * @return userIdentifier
+ */
+ @javax.annotation.Nonnull
+ @JsonProperty(JSON_PROPERTY_USER_IDENTIFIER)
+ @JsonInclude(value = JsonInclude.Include.ALWAYS)
+
+ public String getUserIdentifier() {
+ return userIdentifier;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_USER_IDENTIFIER)
+ @JsonInclude(value = JsonInclude.Include.ALWAYS)
+ public void setUserIdentifier(String userIdentifier) {
+ this.userIdentifier = userIdentifier;
+ }
+
+ public ApplicationInfo extraInfo(Map<String, String> extraInfo) {
+
+ this.extraInfo = extraInfo;
+ return this;
+ }
+
+ public ApplicationInfo putExtraInfoItem(String key, String extraInfoItem) {
+ if (this.extraInfo == null) {
+ this.extraInfo = new HashMap<>();
+ }
+ this.extraInfo.put(key, extraInfoItem);
+ return this;
+ }
+
+ /**
+ * Extra information of the application.
+ * @return extraInfo
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_EXTRA_INFO)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Map<String, String> getExtraInfo() {
+ return extraInfo;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_EXTRA_INFO)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setExtraInfo(Map<String, String> extraInfo) {
+ this.extraInfo = extraInfo;
+ }
+
+ public ApplicationInfo registrationTime(Long registrationTime) {
+
+ this.registrationTime = registrationTime;
+ return this;
+ }
+
+ /**
+ * The registration time of the application.
+ * @return registrationTime
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_REGISTRATION_TIME)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Long getRegistrationTime() {
+ return registrationTime;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_REGISTRATION_TIME)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setRegistrationTime(Long registrationTime) {
+ this.registrationTime = registrationTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ApplicationInfo applicationInfo = (ApplicationInfo) o;
+ return Objects.equals(this.appId, applicationInfo.appId) &&
+ Objects.equals(this.userIdentifier, applicationInfo.userIdentifier) &&
+ Objects.equals(this.extraInfo, applicationInfo.extraInfo) &&
+ Objects.equals(this.registrationTime,
applicationInfo.registrationTime);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(appId, userIdentifier, extraInfo, registrationTime);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("class ApplicationInfo {\n");
+ sb.append(" appId: ").append(toIndentedString(appId)).append("\n");
+ sb.append(" userIdentifier:
").append(toIndentedString(userIdentifier)).append("\n");
+ sb.append(" extraInfo:
").append(toIndentedString(extraInfo)).append("\n");
+ sb.append(" registrationTime:
").append(toIndentedString(registrationTime)).append("\n");
+ sb.append("}");
+ return sb.toString();
+ }
+
+ /**
+ * Convert the given object to string with each line indented by 4 spaces
+ * (except the first line).
+ */
+ private String toIndentedString(Object o) {
+ if (o == null) {
+ return "null";
+ }
+ return o.toString().replace("\n", "\n ");
+ }
+
+}
+
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfoResponse.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfoResponse.java
new file mode 100644
index 000000000..906cc1c03
--- /dev/null
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfoResponse.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.celeborn.rest.v1.model;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonValue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.celeborn.rest.v1.model.ApplicationInfo;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * ApplicationInfoResponse
+ */
+@JsonPropertyOrder({
+ ApplicationInfoResponse.JSON_PROPERTY_APPLICATIONS
+})
[email protected](value =
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator
version: 7.8.0")
+public class ApplicationInfoResponse {
+ public static final String JSON_PROPERTY_APPLICATIONS = "applications";
+ private List<ApplicationInfo> applications = new ArrayList<>();
+
+ public ApplicationInfoResponse() {
+ }
+
+ public ApplicationInfoResponse applications(List<ApplicationInfo>
applications) {
+
+ this.applications = applications;
+ return this;
+ }
+
+ public ApplicationInfoResponse addApplicationsItem(ApplicationInfo
applicationsItem) {
+ if (this.applications == null) {
+ this.applications = new ArrayList<>();
+ }
+ this.applications.add(applicationsItem);
+ return this;
+ }
+
+ /**
+ * The application information.
+ * @return applications
+ */
+ @javax.annotation.Nonnull
+ @JsonProperty(JSON_PROPERTY_APPLICATIONS)
+ @JsonInclude(value = JsonInclude.Include.ALWAYS)
+
+ public List<ApplicationInfo> getApplications() {
+ return applications;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_APPLICATIONS)
+ @JsonInclude(value = JsonInclude.Include.ALWAYS)
+ public void setApplications(List<ApplicationInfo> applications) {
+ this.applications = applications;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ApplicationInfoResponse applicationInfoResponse =
(ApplicationInfoResponse) o;
+ return Objects.equals(this.applications,
applicationInfoResponse.applications);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(applications);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("class ApplicationInfoResponse {\n");
+ sb.append(" applications:
").append(toIndentedString(applications)).append("\n");
+ sb.append("}");
+ return sb.toString();
+ }
+
+ /**
+ * Convert the given object to string with each line indented by 4 spaces
+ * (except the first line).
+ */
+ private String toIndentedString(Object o) {
+ if (o == null) {
+ return "null";
+ }
+ return o.toString().replace("\n", "\n ");
+ }
+
+}
+
diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
index 675978dea..18e7de891 100644
--- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
@@ -327,6 +327,20 @@ paths:
schema:
$ref: '#/components/schemas/ApplicationsHeartbeatResponse'
+ /api/v1/applications/info:
+ get:
+ tags:
+ - Application
+ operationId: getApplicationsInfo
+ description: List all running application's info of the cluster.
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ApplicationInfoResponse'
+
/api/v1/applications/delete_apps:
post:
tags:
@@ -793,6 +807,40 @@ components:
required:
- applications
+
+ ApplicationInfo:
+ type: object
+ properties:
+ appId:
+ type: string
+ description: The id of the application.
+ userIdentifier:
+ type: string
+ description: The user identifier of the application.
+ extraInfo:
+ type: object
+ description: Extra information of the application.
+ additionalProperties:
+ type: string
+ registrationTime:
+ type: integer
+ format: int64
+ description: The registration time of the application.
+ required:
+ - appId
+ - userIdentifier
+
+ ApplicationInfoResponse:
+ type: object
+ properties:
+ applications:
+ type: array
+ description: The application information.
+ items:
+ $ref: '#/components/schemas/ApplicationInfo'
+ required:
+ - applications
+
HostnamesResponse:
type: object
properties:
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
index 87b05c766..d34c79419 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala
@@ -19,16 +19,25 @@ package org.apache.celeborn.tests.client
import java.util
+import org.scalatest.concurrent.Eventually.eventually
+import org.scalatest.concurrent.Futures.{interval, timeout}
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
import org.apache.celeborn.client.{LifecycleManager, WithShuffleClientSuite}
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.service.deploy.MiniClusterFeature
class LifecycleManagerSuite extends WithShuffleClientSuite with
MiniClusterFeature {
+ override protected val userIdentifier = UserIdentifier("test", "celeborn")
celebornConf
.set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true")
.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
+ .set(CelebornConf.USER_SPECIFIC_TENANT.key, userIdentifier.tenantId)
+ .set(CelebornConf.USER_SPECIFIC_USERNAME.key, userIdentifier.name)
+ .set(CelebornConf.USER_SPECIFIC_APPLICATION_INFO.key, "k1=v1")
override def beforeAll(): Unit = {
super.beforeAll()
@@ -99,6 +108,24 @@ class LifecycleManagerSuite extends WithShuffleClientSuite
with MiniClusterFeatu
lifecycleManager.stop()
}
+ test("CELEBORN-1258: Support to register application info with user
identifier and extra info") {
+ val lifecycleManager: LifecycleManager = new LifecycleManager(APP,
celebornConf)
+
+ val arrayList = new util.ArrayList[Integer]()
+ (0 to 10).foreach(i => {
+ arrayList.add(i)
+ })
+
+ lifecycleManager.requestMasterRequestSlotsWithRetry(0, arrayList)
+
+ eventually(timeout(3.seconds), interval(0.milliseconds)) {
+ val appInfo = masterInfo._1.statusSystem.applicationInfos.get(APP)
+ assert(appInfo.userIdentifier == userIdentifier)
+ assert(appInfo.extraInfo.get("k1") == "v1")
+ assert(appInfo.registrationTime > 0 && appInfo.registrationTime <
System.currentTimeMillis())
+ }
+ }
+
override def afterAll(): Unit = {
logInfo("all test complete , stop celeborn mini cluster")
shutdownMiniCluster()