xintongsong commented on a change in pull request #11920:
URL: https://github.com/apache/flink/pull/11920#discussion_r417765252



##########
File path: flink-external-resource/pom.xml
##########
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.11-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-external-resource</artifactId>
+       <name>flink-external-resource</name>

Review comment:
       nit: I would suggest to name this module "flink-external-resources"

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUInformation.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+/**
+ * Information for GPU resource. Currently only including the GPU index.
+ */
+public class GPUInformation implements ExternalResourceInfo {
+
+       private static final String NAME = "GPU Card";

Review comment:
       What is the purpose of this field? It seems this is only used in 
`toString()`, then shouldn't it be a local variable?
   Also, I'm not entirely sure about the phrase `GPU Card`. Maybe `GPU Device`?

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUInformation.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+/**
+ * Information for GPU resource. Currently only including the GPU index.
+ */
+public class GPUInformation implements ExternalResourceInfo {

Review comment:
       I would suggest to name this class `GPUInfo` or `GPUResourceInfo`.
   Ideally, it would be better to align the class name suffixes between 
base/sub classes.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUInformation.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+/**
+ * Information for GPU resource. Currently only including the GPU index.
+ */
+public class GPUInformation implements ExternalResourceInfo {
+
+       private static final String NAME = "GPU Card";
+       private final String index;
+
+       public GPUInformation(String index) {
+               this.index = index;

Review comment:
       ```suggestion
                this.index = Preconditions.checkNotNull(index);
   ```

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+public class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       private static final String RESOURCE_NAME = "gpu";

Review comment:
       I think the resource name should not be hard coded. Ideally, as the user 
already specified the driver factory class, the driver probably should not be 
aware of the resource name.
   
   I think we should either pass-in only the related configurations 
`external-resource.<resource_name>.xyz` with the common prefix 
`external-resource.<resource_name>.` pruned, or pass-in the resource name 
associated to this driver as an argument in runtime.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+public class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       private static final String RESOURCE_NAME = "gpu";
+
+       private static final String DISCOVERY_SCRIPT_PATH_SUFFIX = 
"param.discovery-script.path";
+
+       private static final String DISCOVERY_SCRIPT_ARGS_SUFFIX = 
"param.discovery-script.args";
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_PATH_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_ARGS_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<Long> GPU_AMOUNT =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L);
+
+       private final Set<GPUInformation> gpuResources;
+
+       public GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkRuntimeException("Could not find config 
of the path of gpu discovery script.");
+               }
+
+               final File discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkRuntimeException("The gpu discovery 
script does not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               final String args = config.getString(DISCOVERY_SCRIPT_ARG);
+               final long gpuAmount = config.getLong(GPU_AMOUNT);
+               gpuResources = new HashSet<>();
+
+               if (gpuAmount <= 0) {
+                       LOG.warn("The amount of GPU should be positive.");
+                       return;
+               }
+
+               String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
+               if (output != null && output != "") {
+                       String[] indexes = output.split(",");
+                       for (String index : indexes) {
+                               gpuResources.add(new GPUInformation(index));
+                       }
+               }
+               LOG.info("Discover the GPU resource {}.", gpuResources);
+       }
+
+       @Override
+       public Set<GPUInformation> retrieveResourceInfo(long gpuAmount) {
+               return Collections.unmodifiableSet(gpuResources);
+       }

Review comment:
       The argument `gpuAmount` should not be ignored.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUInformation.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
+
+/**
+ * Information for GPU resource. Currently only including the GPU index.
+ */
+public class GPUInformation implements ExternalResourceInfo {
+
+       private static final String NAME = "GPU Card";
+       private final String index;
+
+       public GPUInformation(String index) {
+               this.index = index;
+       }
+
+       @Override
+       public String toString() {
+               return String.format("%s(%s)", NAME, index);
+       }
+
+       public String getIndex() {
+               return index;
+       }
+}

Review comment:
       This class has been used for hash set. Should also define `hashCode` and 
`equals` for it.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+public class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       private static final String RESOURCE_NAME = "gpu";
+
+       private static final String DISCOVERY_SCRIPT_PATH_SUFFIX = 
"param.discovery-script.path";
+
+       private static final String DISCOVERY_SCRIPT_ARGS_SUFFIX = 
"param.discovery-script.args";
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_PATH_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_ARGS_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<Long> GPU_AMOUNT =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L);
+
+       private final Set<GPUInformation> gpuResources;
+
+       public GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkRuntimeException("Could not find config 
of the path of gpu discovery script.");
+               }
+
+               final File discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkRuntimeException("The gpu discovery 
script does not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               final String args = config.getString(DISCOVERY_SCRIPT_ARG);
+               final long gpuAmount = config.getLong(GPU_AMOUNT);
+               gpuResources = new HashSet<>();
+
+               if (gpuAmount <= 0) {
+                       LOG.warn("The amount of GPU should be positive.");
+                       return;
+               }
+
+               String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
+               if (output != null && output != "") {
+                       String[] indexes = output.split(",");
+                       for (String index : indexes) {
+                               gpuResources.add(new GPUInformation(index));
+                       }
+               }
+               LOG.info("Discover the GPU resource {}.", gpuResources);

Review comment:
       Not sure about executing the discovery script in the constructor.
   I think this could be done lazily when `retrieveResourceInfo` is first 
called.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+public class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       private static final String RESOURCE_NAME = "gpu";
+
+       private static final String DISCOVERY_SCRIPT_PATH_SUFFIX = 
"param.discovery-script.path";
+
+       private static final String DISCOVERY_SCRIPT_ARGS_SUFFIX = 
"param.discovery-script.args";
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_PATH_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_ARGS_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<Long> GPU_AMOUNT =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L);

Review comment:
       Why do we need this? Shouldn't the amount be passed in from outside, 
according to the contract of `ExternalResourceDriver.retrieveResourceInfo(long 
amount)`?

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+public class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       private static final String RESOURCE_NAME = "gpu";
+
+       private static final String DISCOVERY_SCRIPT_PATH_SUFFIX = 
"param.discovery-script.path";
+
+       private static final String DISCOVERY_SCRIPT_ARGS_SUFFIX = 
"param.discovery-script.args";
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_PATH_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_ARGS_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<Long> GPU_AMOUNT =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L);
+
+       private final Set<GPUInformation> gpuResources;
+
+       public GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkRuntimeException("Could not find config 
of the path of gpu discovery script.");
+               }
+
+               final File discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkRuntimeException("The gpu discovery 
script does not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               final String args = config.getString(DISCOVERY_SCRIPT_ARG);
+               final long gpuAmount = config.getLong(GPU_AMOUNT);
+               gpuResources = new HashSet<>();
+
+               if (gpuAmount <= 0) {
+                       LOG.warn("The amount of GPU should be positive.");
+                       return;
+               }
+
+               String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
+               if (output != null && output != "") {

Review comment:
       ```suggestion
                if (output != null && !output.isEmpty()) {
   ```

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/resources/gpu-discovery.sh
##########
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./gpu-discovery-script.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"
+  exit 1
+fi
+
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"
+    ;;
+    --check-dead)
+    CHECK_DEAD_PROCESS="check"
+    ;;
+    --assign-file)
+    ASSIGNMENT_FILE="$2"
+    ;;
+    *)
+    # unknown option
+    ;;
+  esac
+  shift
+done
+
+if [ $AMOUNT -eq 0 ]; then
+  exit 0
+fi
+
+normal_allocate() {

Review comment:
       I would call it `non_previlege_allocate` or so.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+public class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       private static final String RESOURCE_NAME = "gpu";
+
+       private static final String DISCOVERY_SCRIPT_PATH_SUFFIX = 
"param.discovery-script.path";
+
+       private static final String DISCOVERY_SCRIPT_ARGS_SUFFIX = 
"param.discovery-script.args";
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_PATH_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_ARGS_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<Long> GPU_AMOUNT =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L);
+
+       private final Set<GPUInformation> gpuResources;
+
+       public GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkRuntimeException("Could not find config 
of the path of gpu discovery script.");
+               }
+
+               final File discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkRuntimeException("The gpu discovery 
script does not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               final String args = config.getString(DISCOVERY_SCRIPT_ARG);
+               final long gpuAmount = config.getLong(GPU_AMOUNT);
+               gpuResources = new HashSet<>();
+
+               if (gpuAmount <= 0) {
+                       LOG.warn("The amount of GPU should be positive.");
+                       return;
+               }
+
+               String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
+               if (output != null && output != "") {
+                       String[] indexes = output.split(",");
+                       for (String index : indexes) {
+                               gpuResources.add(new GPUInformation(index));
+                       }
+               }
+               LOG.info("Discover the GPU resource {}.", gpuResources);
+       }
+
+       @Override
+       public Set<GPUInformation> retrieveResourceInfo(long gpuAmount) {
+               return Collections.unmodifiableSet(gpuResources);
+       }
+
+       private String executeDiscoveryScript(File discoveryScript, long 
gpuAmount, String args) throws Exception {
+
+               final StringBuilder cmdBuilder = new StringBuilder();
+               cmdBuilder
+                       .append(discoveryScript.getAbsolutePath())
+                       .append(" ")
+                       .append(gpuAmount)
+                       .append(" ")
+                       .append(args);
+
+               final Process process = 
Runtime.getRuntime().exec(cmdBuilder.toString());
+               final BufferedReader stdoutReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+               final BufferedReader stderrReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream()));
+               final int exitVal = process.waitFor();
+               if (exitVal != 0) {
+                       final String stdout = 
stdoutReader.lines().collect(StringBuilder::new, StringBuilder::append, 
StringBuilder::append).toString();
+                       final String stderr = 
stderrReader.lines().collect(StringBuilder::new, StringBuilder::append, 
StringBuilder::append).toString();
+                       LOG.error("Discovery script exit with {}. With output 
{} {}", exitVal, stdout, stderr);
+                       throw new FlinkRuntimeException("Discovery script exit 
with non-zero.");
+               }
+               return stdoutReader.readLine();

Review comment:
       I think we can reuse `JavaBashTestBase#executeScript`. We could move 
this method to somewhere in `flink-core`.
   The implementation of this method looks much concise. We might need to 
modify this method a bit, regarding redirecting error stream and returning the 
exit code.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/resources/gpu-discovery.sh
##########
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./gpu-discovery-script.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"

Review comment:
       The option names are not very descriptive to me.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/resources/gpu-discovery.sh
##########
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./gpu-discovery-script.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"
+  exit 1
+fi
+
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"

Review comment:
       It would be better to give it an empty default value before the loop. 
This prevents accidentally taking an environment variable with the same name.
   Same for `CHECK_DEAD_PROCESS`.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+public class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       private static final String RESOURCE_NAME = "gpu";
+
+       private static final String DISCOVERY_SCRIPT_PATH_SUFFIX = 
"param.discovery-script.path";
+
+       private static final String DISCOVERY_SCRIPT_ARGS_SUFFIX = 
"param.discovery-script.args";
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_PATH_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_ARGS_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<Long> GPU_AMOUNT =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L);
+
+       private final Set<GPUInformation> gpuResources;
+
+       public GPUDriver(Configuration config) throws Exception {
+               final String discoveryScriptPath = 
config.getString(DISCOVERY_SCRIPT_PATH);
+               if (StringUtils.isNullOrWhitespaceOnly(discoveryScriptPath)) {
+                       throw new FlinkRuntimeException("Could not find config 
of the path of gpu discovery script.");
+               }
+
+               final File discoveryScript = new 
File(System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_HOME_DIR, ".") +
+                       "/" + discoveryScriptPath);
+               if (!discoveryScript.exists()) {
+                       throw new FlinkRuntimeException("The gpu discovery 
script does not exist in path " + discoveryScript.getAbsolutePath());
+               }
+
+               final String args = config.getString(DISCOVERY_SCRIPT_ARG);
+               final long gpuAmount = config.getLong(GPU_AMOUNT);
+               gpuResources = new HashSet<>();
+
+               if (gpuAmount <= 0) {
+                       LOG.warn("The amount of GPU should be positive.");
+                       return;
+               }
+
+               String output = executeDiscoveryScript(discoveryScript, 
gpuAmount, args);
+               if (output != null && output != "") {
+                       String[] indexes = output.split(",");
+                       for (String index : indexes) {
+                               gpuResources.add(new GPUInformation(index));
+                       }
+               }
+               LOG.info("Discover the GPU resource {}.", gpuResources);
+       }
+
+       @Override
+       public Set<GPUInformation> retrieveResourceInfo(long gpuAmount) {
+               return Collections.unmodifiableSet(gpuResources);
+       }
+
+       private String executeDiscoveryScript(File discoveryScript, long 
gpuAmount, String args) throws Exception {
+
+               final StringBuilder cmdBuilder = new StringBuilder();
+               cmdBuilder
+                       .append(discoveryScript.getAbsolutePath())
+                       .append(" ")
+                       .append(gpuAmount)
+                       .append(" ")
+                       .append(args);

Review comment:
       I think it's not necessary to use `StringBuilder` here. We do not have 
any if-branch or loops building this string. It would be more light-weighted to 
simply use `String`.
   Or `String []` if we decided to reuse `JavaBashTestBase#executeScript`.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/test/java/org/apache/flink/externalresource/gpu/GPUDriverTest.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.Test;
+
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link GPUDriver}.
+ */
+public class GPUDriverTest {
+
+       private static final String TEST_DISCOVERY_SCRIPT_PATH = 
"src/test/resources/test-gpu-discovery.sh";
+
+       @Test
+       public void testGPUDriverWithTestScript() throws Exception {
+               final int gpuAmount = 2;
+               final Configuration config = new Configuration();
+               config.setLong(GPUDriver.GPU_AMOUNT, gpuAmount);
+               config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, 
TEST_DISCOVERY_SCRIPT_PATH);
+
+               final GPUDriver gpuDriver = new GPUDriver(config);
+               final Set<GPUInformation> gpuResource = 
gpuDriver.retrieveResourceInfo(gpuAmount);
+
+               assertThat(gpuResource.size(), is(gpuAmount));
+       }
+
+       @Test
+       public void testGPUDriverWithInvalidAmount() throws Exception {
+               final int gpuAmount = -1;
+               final Configuration config = new Configuration();
+               config.setLong(GPUDriver.GPU_AMOUNT, gpuAmount);
+               config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, 
TEST_DISCOVERY_SCRIPT_PATH);
+
+               final GPUDriver gpuDriver = new GPUDriver(config);
+               final Set<GPUInformation> gpuResource = 
gpuDriver.retrieveResourceInfo(gpuAmount);
+
+               assertThat(gpuResource.size(), is(0));
+       }
+
+       @Test(expected = FlinkRuntimeException.class)
+       public void testGPUDriverWithoutConfigTestScript() throws Exception {
+               final Configuration config = new Configuration();
+
+               final GPUDriver gpuDriver = new GPUDriver(config);

Review comment:
       ```suggestion
                new GPUDriver(config);
   ```

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+public class GPUDriver implements ExternalResourceDriver {

Review comment:
       ```suggestion
   class GPUDriver implements ExternalResourceDriver {
   ```

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/resources/gpu-discovery.sh
##########
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./gpu-discovery-script.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"
+  exit 1
+fi
+
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"
+    ;;
+    --check-dead)
+    CHECK_DEAD_PROCESS="check"
+    ;;
+    --assign-file)
+    ASSIGNMENT_FILE="$2"
+    ;;
+    *)
+    # unknown option
+    ;;
+  esac
+  shift

Review comment:
       Usually, I would suggest to avoid `shift` for better readability.
   In this particular case, I think it probably make sense to use shift because 
we do not require certain order of the arguments. However, I would suggest to 
`shift` inside each case branch. It would be better to have an explicit `shift 
2` for `--assign-file` which takes two arguments, rather than relying on 
ignoring unknown tokens.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/resources/gpu-discovery.sh
##########
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash

Review comment:
       Can we have some test cases verify the behavior of this script?

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/resources/gpu-discovery.sh
##########
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./gpu-discovery-script.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"
+  exit 1
+fi
+
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"
+    ;;
+    --check-dead)
+    CHECK_DEAD_PROCESS="check"
+    ;;
+    --assign-file)
+    ASSIGNMENT_FILE="$2"
+    ;;
+    *)
+    # unknown option
+    ;;
+  esac
+  shift
+done
+
+if [ $AMOUNT -eq 0 ]; then
+  exit 0
+fi
+
+normal_allocate() {
+  indexes=$1
+  occupy_indexes=()
+  for i in $indexes
+    do
+      if [ ${#occupy_indexes[@]} -lt $AMOUNT ]; then
+        occupy_indexes[${#occupy_indexes[@]}]=$i
+      fi
+    done
+    if [ $AMOUNT -gt ${#occupy_indexes[@]} ]; then
+      echo "Could not get enough GPU resources."
+      exit 1
+    fi
+    echo ${occupy_indexes[@]} | sed 's/ /,/g'

Review comment:
       The indention is not right.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/test/java/org/apache/flink/externalresource/gpu/GPUDriverTest.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.Test;
+
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link GPUDriver}.
+ */
+public class GPUDriverTest {

Review comment:
       ```suggestion
   public class GPUDriverTest extends TestLogger {
   ```

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/test/java/org/apache/flink/externalresource/gpu/GPUDriverTest.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.junit.Test;
+
+import java.util.Set;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link GPUDriver}.
+ */
+public class GPUDriverTest {
+
+       private static final String TEST_DISCOVERY_SCRIPT_PATH = 
"src/test/resources/test-gpu-discovery.sh";
+
+       @Test
+       public void testGPUDriverWithTestScript() throws Exception {
+               final int gpuAmount = 2;
+               final Configuration config = new Configuration();
+               config.setLong(GPUDriver.GPU_AMOUNT, gpuAmount);
+               config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, 
TEST_DISCOVERY_SCRIPT_PATH);
+
+               final GPUDriver gpuDriver = new GPUDriver(config);
+               final Set<GPUInformation> gpuResource = 
gpuDriver.retrieveResourceInfo(gpuAmount);
+
+               assertThat(gpuResource.size(), is(gpuAmount));
+       }
+
+       @Test
+       public void testGPUDriverWithInvalidAmount() throws Exception {
+               final int gpuAmount = -1;
+               final Configuration config = new Configuration();
+               config.setLong(GPUDriver.GPU_AMOUNT, gpuAmount);
+               config.setString(GPUDriver.DISCOVERY_SCRIPT_PATH, 
TEST_DISCOVERY_SCRIPT_PATH);
+
+               final GPUDriver gpuDriver = new GPUDriver(config);
+               final Set<GPUInformation> gpuResource = 
gpuDriver.retrieveResourceInfo(gpuAmount);
+
+               assertThat(gpuResource.size(), is(0));
+       }
+
+       @Test(expected = FlinkRuntimeException.class)
+       public void testGPUDriverWithoutConfigTestScript() throws Exception {
+               final Configuration config = new Configuration();
+
+               final GPUDriver gpuDriver = new GPUDriver(config);

Review comment:
       Same for the following 2 cases.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/resources/gpu-discovery.sh
##########
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./gpu-discovery-script.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"
+  exit 1
+fi
+
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"
+    ;;
+    --check-dead)
+    CHECK_DEAD_PROCESS="check"
+    ;;
+    --assign-file)
+    ASSIGNMENT_FILE="$2"
+    ;;
+    *)
+    # unknown option
+    ;;
+  esac
+  shift
+done
+
+if [ $AMOUNT -eq 0 ]; then
+  exit 0
+fi
+
+normal_allocate() {
+  indexes=$1
+  occupy_indexes=()
+  for i in $indexes
+    do
+      if [ ${#occupy_indexes[@]} -lt $AMOUNT ]; then
+        occupy_indexes[${#occupy_indexes[@]}]=$i
+      fi

Review comment:
       Shall we break the loop in an else branch?

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/test/resources/test-gpu-discovery.sh
##########
@@ -0,0 +1,53 @@
+#!/usr/bin/env bash

Review comment:
       I would call this `testing-gpu-discovery.sh`.
   My understanding is that, `test-xyz` is usually some test cases that 
verifies the behavior of `xyz`, while `testing-xyz` is some special 
implementation of `xyz` for testing purpose.

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/java/org/apache/flink/externalresource/gpu/GPUDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.externalresource.gpu;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Driver takes the responsibility to discover GPU resources and provide the 
GPU resource information.
+ * It get the GPU information by executing user-defined discovery script.
+ */
+public class GPUDriver implements ExternalResourceDriver {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(GPUDriver.class);
+
+       private static final String RESOURCE_NAME = "gpu";
+
+       private static final String DISCOVERY_SCRIPT_PATH_SUFFIX = 
"param.discovery-script.path";
+
+       private static final String DISCOVERY_SCRIPT_ARGS_SUFFIX = 
"param.discovery-script.args";
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_PATH_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
DISCOVERY_SCRIPT_ARGS_SUFFIX))
+                       .stringType()
+                       .noDefaultValue();
+
+       @VisibleForTesting
+       static final ConfigOption<Long> GPU_AMOUNT =
+               
key(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME, 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX))
+                       .longType()
+                       .defaultValue(0L);
+
+       private final Set<GPUInformation> gpuResources;
+
+       public GPUDriver(Configuration config) throws Exception {

Review comment:
       ```suggestion
   GPUDriver(Configuration config) throws Exception {
   ```

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/test/resources/test-gpu-discovery.sh
##########
@@ -0,0 +1,53 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./gpu-discovery-script.sh gpu-amount [exit-non-zero]"
+  exit 1
+fi
+
+AMOUNT=$1
+
+if [ $AMOUNT -eq 0 ]; then
+  exit 0
+fi
+
+if [ "$2" == "exit-non-zero" ]; then
+  exit 1
+fi
+
+
+normal_allocate() {
+  indexes=$1
+  occupy_indexes=()
+  for i in $indexes
+    do
+      if [ ${#occupy_indexes[@]} -lt $AMOUNT ]; then
+        occupy_indexes[${#occupy_indexes[@]}]=$i
+      fi
+    done
+    if [ $AMOUNT -gt ${#occupy_indexes[@]} ]; then
+      echo "Could not get enough GPU resources."
+      exit 1
+    fi
+    echo ${occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+indexes=($(seq 0 1 $AMOUNT))
+normal_allocate "${indexes[*]}"

Review comment:
       Can this script simply always return `seq ${AMOUNT} | paste -sd ',' -`, 
unless `exit-non-zero`?

##########
File path: 
flink-external-resource/flink-external-resource-gpu/src/main/resources/gpu-discovery.sh
##########
@@ -0,0 +1,121 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+if [ $# -lt 1 ]; then
+  echo "Usage: ./gpu-discovery-script.sh gpu-amount [--privilege] 
[--check-dead] [--assign-file filePath]"
+  exit 1
+fi
+
+
+AMOUNT=$1
+shift
+ASSIGNMENT_FILE="/var/tmp/flink-gpu-assignment"
+
+while [[ $# -ge 1 ]]
+do
+key="$1"
+  case $key in
+    --privilege)
+    PRIVILEGE_OPTION="privilege"
+    ;;
+    --check-dead)
+    CHECK_DEAD_PROCESS="check"
+    ;;
+    --assign-file)
+    ASSIGNMENT_FILE="$2"
+    ;;
+    *)
+    # unknown option
+    ;;
+  esac
+  shift
+done
+
+if [ $AMOUNT -eq 0 ]; then
+  exit 0
+fi
+
+normal_allocate() {
+  indexes=$1
+  occupy_indexes=()
+  for i in $indexes
+    do
+      if [ ${#occupy_indexes[@]} -lt $AMOUNT ]; then
+        occupy_indexes[${#occupy_indexes[@]}]=$i
+      fi
+    done
+    if [ $AMOUNT -gt ${#occupy_indexes[@]} ]; then
+      echo "Could not get enough GPU resources."
+      exit 1
+    fi
+    echo ${occupy_indexes[@]} | sed 's/ /,/g'
+}
+
+privilege_allocate() {
+  indexes=$1
+  (
+    flock -x 200
+
+    occupy_indexes=()
+    deny_indexes=()
+    for i in $indexes
+    do
+      if [ ${#occupy_indexes[@]} -eq $AMOUNT ] || [ `grep -c "^$i " 
$ASSIGNMENT_FILE` -ne 0 ]; then
+        deny_indexes[${#deny_indexes[@]}]=$i
+      else
+        occupy_indexes[${#occupy_indexes[@]}]=$i
+      fi
+    done
+
+    if [ "$CHECK_DEAD_PROCESS" == "check" ]; then
+      for i in ${!deny_indexes[@]}
+      do
+        if [ ${#occupy_indexes[@]} -eq $AMOUNT ];then
+          break
+        fi
+        owner=`grep "^${deny_indexes[$i]} " $ASSIGNMENT_FILE | awk '{print 
$2}'`
+        if [ -n $owner ] && [ `ps -p $owner | grep -c $owner` -eq 0 ]; then
+          # The owner does not exist anymore. We could occupy it.
+          sed -i "/${deny_indexes[$i]} /d" $ASSIGNMENT_FILE
+          occupy_indexes[${#occupy_indexes[@]}]=${deny_indexes[$i]}
+          unset deny_indexes[$i]
+        fi
+      done
+    fi
+
+    if [ $AMOUNT -gt ${#occupy_indexes[@]} ]; then
+      echo "Could not get enough GPU resources."
+      exit 1
+    fi
+
+    for i in "${occupy_indexes[@]}"
+    do
+      echo "$i $PPID" >> $ASSIGNMENT_FILE
+    done
+
+    echo ${occupy_indexes[@]} | sed 's/ /,/g'
+  ) 200<> $ASSIGNMENT_FILE
+}
+
+IFS=',' read -r -a indexes <<< `nvidia-smi --query-gpu=index 
--format=csv,noheader`

Review comment:
       What if the `nvidia-smi` failed, or the command doesn't exist?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to