[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-16 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r426154092



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import 
org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest extends TestLogger {
+
+   private static final String RESOURCE_NAME_1 = "foo";
+   private static final String RESOURCE_NAME_2 = "bar";
+   private static final List RESOURCE_LIST = 
Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2);
+   private static final long RESOURCE_AMOUNT_1 = 2L;
+   private static final long RESOURCE_AMOUNT_2 = 1L;
+   private static final String RESOURCE_CONFIG_KEY_1 = "flink1";
+   private static final String RESOURCE_CONFIG_KEY_2 = "flink2";
+   private static final String SUFFIX = "flink.config-key";
+
+   @Test
+   public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() {
+   final Configuration config = new Configuration();
+   final String resourceConfigKey = "";
+
+   config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
RESOURCE_LIST);
+   
config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1);

Review comment:
   That makes sense. Thanks for the explanation! I've updated the PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-16 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r426154092



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import 
org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest extends TestLogger {
+
+   private static final String RESOURCE_NAME_1 = "foo";
+   private static final String RESOURCE_NAME_2 = "bar";
+   private static final List RESOURCE_LIST = 
Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2);
+   private static final long RESOURCE_AMOUNT_1 = 2L;
+   private static final long RESOURCE_AMOUNT_2 = 1L;
+   private static final String RESOURCE_CONFIG_KEY_1 = "flink1";
+   private static final String RESOURCE_CONFIG_KEY_2 = "flink2";
+   private static final String SUFFIX = "flink.config-key";
+
+   @Test
+   public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() {
+   final Configuration config = new Configuration();
+   final String resourceConfigKey = "";
+
+   config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
RESOURCE_LIST);
+   
config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1);

Review comment:
   That makes sense. Thanks for the explanation!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r426108639



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.entrySet(), is(empty()));
+   }
+
+   @Test
+   public void testSetAndGetExtendedResourcesWithYarnSupport() {
+ 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r426108639



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.entrySet(), is(empty()));
+   }
+
+   @Test
+   public void testSetAndGetExtendedResourcesWithYarnSupport() {
+ 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r426108639



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.entrySet(), is(empty()));
+   }
+
+   @Test
+   public void testSetAndGetExtendedResourcesWithYarnSupport() {
+ 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r426108639



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.entrySet(), is(empty()));
+   }
+
+   @Test
+   public void testSetAndGetExtendedResourcesWithYarnSupport() {
+ 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425920446



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
+import 
org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link ExternalResourceUtils} class.
+ */
+public class ExternalResourceUtilsTest extends TestLogger {
+
+   private static final String RESOURCE_NAME_1 = "foo";
+   private static final String RESOURCE_NAME_2 = "bar";
+   private static final List RESOURCE_LIST = 
Arrays.asList(RESOURCE_NAME_1, RESOURCE_NAME_2);
+   private static final long RESOURCE_AMOUNT_1 = 2L;
+   private static final long RESOURCE_AMOUNT_2 = 1L;
+   private static final String RESOURCE_CONFIG_KEY_1 = "flink1";
+   private static final String RESOURCE_CONFIG_KEY_2 = "flink2";
+   private static final String SUFFIX = "flink.config-key";
+
+   @Test
+   public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() {
+   final Configuration config = new Configuration();
+   final String resourceConfigKey = "";
+
+   config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, 
RESOURCE_LIST);
+   
config.setLong(ExternalResourceOptions.keyWithResourceNameAndSuffix(RESOURCE_NAME_1,
 ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), RESOURCE_AMOUNT_1);

Review comment:
   Not sure about that. From my side, `keyWithResourceNameAndSuffix` is 
generic enough. Introduce a specific helper method for each config option might 
make `ExternalResourceOptions` become fat in the future. I admit it could 
improve the readability though. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425918081



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.entrySet(), is(empty()));
+   }
+
+   @Test
+   public void testSetAndGetExtendedResourcesWithYarnSupport() {
+ 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425898580



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.entrySet(), is(empty()));
+   }
+
+   @Test
+   public void testSetAndGetExtendedResourcesWithYarnSupport() {
+ 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425877967



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.entrySet(), is(empty()));
+   }
+
+   @Test
+   public void testSetAndGetExtendedResourcesWithYarnSupport() {
+ 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425871138



##
File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginManagerImpl.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Implementation of {@link PluginManager}.
+ */
+@Internal
+@ThreadSafe
+public class PluginManagerImpl implements PluginManager {

Review comment:
   I think `DefaultPluginManager` might be good enough. Sorry that I'm not 
good at naming or documenting.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425868874



##
File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java
##
@@ -18,42 +18,12 @@
 
 package org.apache.flink.core.plugin;
 
-import org.apache.flink.annotation.Internal;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Iterator;
 
 /**
- * Manager class and entry-point for the plugin mechanism in Flink.
+ * Interface for manager class and entry-point for the plugin mechanism in 
Flink.

Review comment:
   That would be more clear and descriptive. Thanks for the advice.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425868874



##
File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java
##
@@ -18,42 +18,12 @@
 
 package org.apache.flink.core.plugin;
 
-import org.apache.flink.annotation.Internal;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Iterator;
 
 /**
- * Manager class and entry-point for the plugin mechanism in Flink.
+ * Interface for manager class and entry-point for the plugin mechanism in 
Flink.

Review comment:
   That would be much more clear and descriptive. Thanks for the advice.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425666112



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+   static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+   /** Class used to set the extended resource. */
+   private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceSetResourceInformationMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceGetResourcesMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetNameMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetValueMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationNewInstanceMethod;
+
+   private final boolean isYarnResourceTypesAvailable;
+
+   private ResourceInformationReflector() {
+   this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+   }
+
+   @VisibleForTesting
+   ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+   Method resourceSetResourceInformationMethod = null;
+   Method resourceGetResourcesMethod = null;
+   Method resourceInformationGetNameMethod = null;
+   Method resourceInformationGetValueMethod = null;
+   Method resourceInformationNewInstanceMethod = null;
+   boolean isYarnResourceTypesAvailable = false;
+   try {
+   final Class resourceClass = 
Class.forName(resourceClassName);
+   final Class resourceInfoClass = 
Class.forName(resourceInfoClassName);
+   resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+   resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+   resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+   resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+   resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+   isYarnResourceTypesAvailable = true;
+   } catch (Exception e) {
+   LOG.debug("The underlying Yarn does not support 
external resource.", e);
+   } finally {
+   this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+   this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+   this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+   this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+   this.resourceInformationNewInstanceMethod = 
resourceInformationN

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425666112



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+   static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+   /** Class used to set the extended resource. */
+   private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceSetResourceInformationMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceGetResourcesMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetNameMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetValueMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationNewInstanceMethod;
+
+   private final boolean isYarnResourceTypesAvailable;
+
+   private ResourceInformationReflector() {
+   this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+   }
+
+   @VisibleForTesting
+   ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+   Method resourceSetResourceInformationMethod = null;
+   Method resourceGetResourcesMethod = null;
+   Method resourceInformationGetNameMethod = null;
+   Method resourceInformationGetValueMethod = null;
+   Method resourceInformationNewInstanceMethod = null;
+   boolean isYarnResourceTypesAvailable = false;
+   try {
+   final Class resourceClass = 
Class.forName(resourceClassName);
+   final Class resourceInfoClass = 
Class.forName(resourceInfoClassName);
+   resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+   resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+   resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+   resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+   resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+   isYarnResourceTypesAvailable = true;
+   } catch (Exception e) {
+   LOG.debug("The underlying Yarn does not support 
external resource.", e);
+   } finally {
+   this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+   this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+   this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+   this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+   this.resourceInformationNewInstanceMethod = 
resourceInformationN

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-15 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425632540



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+   static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+   /** Class used to set the extended resource. */
+   private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceSetResourceInformationMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceGetResourcesMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetNameMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetValueMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationNewInstanceMethod;
+
+   private final boolean isYarnResourceTypesAvailable;
+
+   private ResourceInformationReflector() {
+   this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+   }
+
+   @VisibleForTesting
+   ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+   Method resourceSetResourceInformationMethod = null;
+   Method resourceGetResourcesMethod = null;
+   Method resourceInformationGetNameMethod = null;
+   Method resourceInformationGetValueMethod = null;
+   Method resourceInformationNewInstanceMethod = null;
+   boolean isYarnResourceTypesAvailable = false;
+   try {
+   final Class resourceClass = 
Class.forName(resourceClassName);
+   final Class resourceInfoClass = 
Class.forName(resourceInfoClassName);
+   resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+   resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+   resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+   resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+   resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+   isYarnResourceTypesAvailable = true;
+   } catch (Exception e) {
+   LOG.debug("The underlying Yarn does not support 
external resource.", e);
+   } finally {
+   this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+   this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+   this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+   this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+   this.resourceInformationNewInstanceMethod = 
resourceInformationN

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425580321



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##
@@ -157,6 +158,13 @@
 
GlobalAggregateManager getGlobalAggregateManager();
 
+   /**
+* Get the enabled external resource drivers for external resources.
+*
+* @return the enabled external resource drivers for external resources
+*/
+   Map getExternalResourceDrivers();

Review comment:
   I think `Map> 
getExternalResourceInfos` would be good enough for current version.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425580321



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##
@@ -157,6 +158,13 @@
 
GlobalAggregateManager getGlobalAggregateManager();
 
+   /**
+* Get the enabled external resource drivers for external resources.
+*
+* @return the enabled external resource drivers for external resources
+*/
+   Map getExternalResourceDrivers();

Review comment:
   I think `Map getExternalResourceInfos` 
would be good enough for current version.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425577548



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
##
@@ -157,6 +158,13 @@
 
GlobalAggregateManager getGlobalAggregateManager();
 
+   /**
+* Get the enabled external resource drivers for external resources.
+*
+* @return the enabled external resource drivers for external resources
+*/
+   Map getExternalResourceDrivers();

Review comment:
   I think it would be good enough to expose `Set 
getExternalResourceInfo(String resourceName)`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425575563



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
##
@@ -183,8 +186,9 @@ protected void closeLocalStrategiesAndCaches() {
@Override
public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup 
metrics) {
Environment env = getEnvironment();
+
return new IterativeRuntimeUdfContext(env.getTaskInfo(), 
getUserCodeClassLoader(),
-   getExecutionConfig(), 
env.getDistributedCacheEntries(), this.accumulatorMap, metrics);
+   getExecutionConfig(), 
env.getDistributedCacheEntries(), this.accumulatorMap, metrics, 
ExternalResourceUtils.getExternalResourceInfo(env.getExternalResourceDrivers(), 
env.getTaskManagerInfo().getConfiguration()));

Review comment:
   I think it would not be expensive since we only retrieve the 
`ExternalResourceInfos` at the first time. After that, it will use the cache in 
`SharedExternalResources`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425552834



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(0));

Review comment:
   If so, it should be `assertThat(externalResources.keySet(), 
is(empty()));`. For me, I prefer the current one. WDYT?




---

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425562704



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+   static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+   /** Class used to set the extended resource. */
+   private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceSetResourceInformationMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceGetResourcesMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetNameMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetValueMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationNewInstanceMethod;
+
+   private final boolean isYarnResourceTypesAvailable;
+
+   private ResourceInformationReflector() {
+   this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+   }
+
+   @VisibleForTesting
+   ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+   Method resourceSetResourceInformationMethod = null;
+   Method resourceGetResourcesMethod = null;
+   Method resourceInformationGetNameMethod = null;
+   Method resourceInformationGetValueMethod = null;
+   Method resourceInformationNewInstanceMethod = null;
+   boolean isYarnResourceTypesAvailable = false;
+   try {
+   final Class resourceClass = 
Class.forName(resourceClassName);
+   final Class resourceInfoClass = 
Class.forName(resourceInfoClassName);
+   resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+   resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+   resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+   resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+   resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+   isYarnResourceTypesAvailable = true;
+   } catch (Exception e) {
+   LOG.debug("The underlying Yarn does not support 
external resource.", e);
+   } finally {
+   this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+   this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+   this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+   this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+   this.resourceInformationNewInstanceMethod = 
resourceInformationN

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425553720



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(0));
+   }
+
+   @Test
+   public void testSetAndGetExtendedResourcesWithYarnSupport() {
+   assumeTrue(HadoopUtils.isMinHadoopVersio

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425552834



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/ResourceInformationReflectorTest.java
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link ResourceInformationReflector}.
+ */
+public class ResourceInformationReflectorTest extends TestLogger {
+
+   private static final String RESOURCE_NAME = "test";
+   private static final long RESOURCE_VALUE = 1;
+
+   @Test
+   public void testSetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNotNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getName(), 
is(RESOURCE_NAME));
+   
assertThat(resourceWithMethod.getResourceWithName(RESOURCE_NAME).getValue(), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodPresent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(1));
+   assertTrue(externalResources.containsKey(RESOURCE_NAME));
+   assertThat(externalResources.get(RESOURCE_NAME), 
is(RESOURCE_VALUE));
+   }
+
+   @Test
+   public void testSetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   
resourceInformationReflector.setResourceInformationUnSafe(resourceWithMethod, 
RESOURCE_NAME, RESOURCE_VALUE);
+
+   
assertNull(resourceWithMethod.getResourceWithName(RESOURCE_NAME));
+   }
+
+   @Test
+   public void testGetResourceInformationIfMethodAbsent() {
+   final ResourceInformationReflector resourceInformationReflector 
= new ResourceInformationReflector(ResourceWithoutMethod.class.getName(), 
ResourceInfoWithMethod.class.getName());
+   final ResourceWithMethod resourceWithMethod = new 
ResourceWithMethod();
+   resourceWithMethod.setResourceInformation(RESOURCE_NAME, 
ResourceInfoWithMethod.newInstance(RESOURCE_NAME, RESOURCE_VALUE));
+
+   final Map externalResources = 
resourceInformationReflector.getExternalResourcesUnSafe(resourceWithMethod);
+   assertThat(externalResources.size(), is(0));

Review comment:
   If so, it should be `assertThat(externalResources.keySet(), 
is(empty()));`. For me, I prefer the current one. WDYT?




---

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425550221



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.0+ or 2.10+.
+ */
+class ResourceInformationReflector {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ResourceInformationReflector.class);
+
+   static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+   /** Class used to set the extended resource. */
+   private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceSetResourceInformationMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceGetResourcesMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetNameMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationGetValueMethod;
+
+   /** Could be Null iff isYarnResourceTypesAvailable is false. */
+   @Nullable
+   private final Method resourceInformationNewInstanceMethod;
+
+   private final boolean isYarnResourceTypesAvailable;
+
+   private ResourceInformationReflector() {
+   this(Resource.class.getName(), RESOURCE_INFO_CLASS);
+   }
+
+   @VisibleForTesting
+   ResourceInformationReflector(String resourceClassName, String 
resourceInfoClassName) {
+   Method resourceSetResourceInformationMethod = null;
+   Method resourceGetResourcesMethod = null;
+   Method resourceInformationGetNameMethod = null;
+   Method resourceInformationGetValueMethod = null;
+   Method resourceInformationNewInstanceMethod = null;
+   boolean isYarnResourceTypesAvailable = false;
+   try {
+   final Class resourceClass = 
Class.forName(resourceClassName);
+   final Class resourceInfoClass = 
Class.forName(resourceInfoClassName);
+   resourceSetResourceInformationMethod = 
resourceClass.getMethod("setResourceInformation", String.class, 
resourceInfoClass);
+   resourceGetResourcesMethod = 
resourceClass.getMethod("getResources");
+   resourceInformationGetNameMethod = 
resourceInfoClass.getMethod("getName");
+   resourceInformationGetValueMethod = 
resourceInfoClass.getMethod("getValue");
+   resourceInformationNewInstanceMethod = 
resourceInfoClass.getMethod("newInstance", String.class, long.class);
+   isYarnResourceTypesAvailable = true;
+   } catch (Exception e) {
+   LOG.debug("The underlying Yarn does not support 
external resource.", e);
+   } finally {
+   this.resourceSetResourceInformationMethod = 
resourceSetResourceInformationMethod;
+   this.resourceGetResourcesMethod = 
resourceGetResourcesMethod;
+   this.resourceInformationGetNameMethod = 
resourceInformationGetNameMethod;
+   this.resourceInformationGetValueMethod = 
resourceInformationGetValueMethod;
+   this.resourceInformationNewInstanceMethod = 
resourceInformationN

[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425548306



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.externalresource;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExternalResourceOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Utility class for external resource framework.
+ */
+public class ExternalResourceUtils {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ExternalResourceUtils.class);
+
+   private ExternalResourceUtils() {
+   throw new UnsupportedOperationException("This class should 
never be instantiated.");
+   }
+
+   /**
+* Get the enabled external resource list from configuration.
+*/
+   private static Set getExternalResourceSet(Configuration config) 
{
+   return new HashSet<>(ConfigUtils.decodeListFromConfig(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, x -> x));
+   }
+
+   /**
+* Get the external resources map for Kubernetes. The key should be 
used for deployment specific container request,
+* and values should be the amount of that resource.
+*/
+   public static Map 
getExternalResourcesForKubernetes(Configuration config) {
+   return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX);
+   }
+
+   /**
+* Get the external resources map for Yarn. The key should be used for 
deployment specific container request,
+* and values should be the amount of that resource.
+*/
+   public static Map 
getExternalResourcesForYarn(Configuration config) {
+   return getExternalResources(config, 
ExternalResourceOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX);
+   }
+
+   /**
+* Get the external resources map.
+*/
+   @VisibleForTesting
+   static Map getExternalResources(Configuration config, 
String suffix) {

Review comment:
   I would keep this method for deduplication. But I agreed that 
`getExternalResourcesForYarn` and `getExternalResourcesForKubernetes` should 
belong into the specific modules.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-14 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r425540467



##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ExternalResourceOptions.java
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for external resources and external resource drivers.
+ */
+@PublicEvolving
+public class ExternalResourceOptions {

Review comment:
   I add an example to `external-resources` and 
`external-resource..driver-factory.class`. I'd like to give that 
snippet in the user doc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-05-06 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r420604022



##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/externalresource/ExternalResourceInfo.java
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.externalresource;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collection;
+
+/**
+ * Contains the information of an external resource.
+ */
+@PublicEvolving
+public interface ExternalResourceInfo {
+
+   /**
+* Get the property indicated by the specified key.
+*/
+   String getProperty(String key);

Review comment:
   I think it probably makes sense to return null here, just like `Map` or 
`Properties` in Java. However, I found the code style guide says:
   
   > Always use Optional to return nullable values in the API/public methods 
except the case of a proven performance concern.
   
   I'm not entirely sure about whether we should return `Optional` here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-04-29 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r417782697



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/ResourceInformationReflector.java
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Looks up the methods related to 
org.apache.hadoop.yarn.api.records.Resource#setResourceInformation.
+ * Only supported in Hadoop 3.1+ or 2.10+.
+ */
+public class ResourceInformationReflector {
+
+   static final ResourceInformationReflector INSTANCE = new 
ResourceInformationReflector();
+
+   /** Class used to set the extended resource. */
+   private static final String RESOURCE_INFO_CLASS = 
"org.apache.hadoop.yarn.api.records.ResourceInformation";
+
+   private Class resourceInfoClass;
+   private Method setResourceInfoMethod;
+   private Method getResourceValueMethod;
+   private Method getInfoNameMethod;
+   private Method getInfoValueMethod;
+   private Method resourceInfoNewInstance;
+   private boolean isYarnResourceTypesAvailable;

Review comment:
   Not sure if it is worth to do that. IMO, it would hurt the readability.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11854: [FLINK-17407] Introduce external resource framework

2020-04-27 Thread GitBox


KarmaGYZ commented on a change in pull request #11854:
URL: https://github.com/apache/flink/pull/11854#discussion_r415807586



##
File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
##
@@ -198,6 +199,11 @@ public GlobalAggregateManager getGlobalAggregateManager() {
throw new UnsupportedOperationException(ERROR_MSG);
}
 
+   @Override
+   public Map getExternalResourceDrivers() 
{
+   return Collections.emptyMap();
+   }

Review comment:
   After an offline talk with @sjwiesman , I believe it probably makes 
sense to return emptyMap here.
   The root reason of the failure of the `ITCase` is that in 
`BoundedOneInputStreamTaskRunner.mapPartition` we start a new 
`BoundedStreamTask`  with `SavepointEnvironment`. We have to do this is because 
there is no way of accessing the actual execution environment from within the 
dataset API. Eventually, when there is a bounded datastream api, all of this 
will go away. So, at the moment, I think it would be ok to return 
`Collections.emptyMap()`. @sjwiesman Please correct me if I don't express it 
correctly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org