This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 280df72  [FLINK-17968][hbase] Fix Hadoop Configuration is not properly 
serialized in HBaseRowInputFormat
280df72 is described below

commit 280df7236a796d5b175f94952bea07b207ddf0d8
Author: zhangmang <zhangma...@163.com>
AuthorDate: Tue Jun 2 11:26:32 2020 +0800

    [FLINK-17968][hbase] Fix Hadoop Configuration is not properly serialized in 
HBaseRowInputFormat
    
    This closes #12146
---
 .../flink/addons/hbase/TableInputFormat.java       |  4 ++
 .../hbase/source/AbstractTableInputFormat.java     | 13 ++++
 .../connector/hbase/source/HBaseInputFormat.java   | 13 +++-
 .../hbase/source/HBaseRowDataInputFormat.java      | 12 +---
 .../hbase/source/HBaseRowInputFormat.java          | 11 +--
 .../connector/hbase/HBaseConnectorITCase.java      | 12 ++--
 .../connector/hbase/example/HBaseReadExample.java  |  3 +-
 .../flink/connector/hbase/util/HBaseTestBase.java  |  1 -
 .../hbase/util/HBaseTestingClusterAutoStarter.java | 83 +---------------------
 9 files changed, 40 insertions(+), 112 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 9be258e..a5e044e 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -31,4 +31,8 @@ import 
org.apache.flink.connector.hbase.source.HBaseInputFormat;
 public abstract class TableInputFormat<T extends Tuple> extends 
HBaseInputFormat<T> {
        private static final long serialVersionUID = 1L;
 
+       public TableInputFormat(org.apache.hadoop.conf.Configuration hConf) {
+               super(hConf);
+       }
+
 }
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
index 684afd2..94d36d3 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
@@ -24,8 +24,10 @@ import 
org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.core.io.InputSplitAssigner;
 
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -60,6 +62,13 @@ abstract class AbstractTableInputFormat<T> extends 
RichInputFormat<T, TableInput
        protected byte[] currentRow;
        protected long scannedRows;
 
+       // Configuration is not serializable
+       protected byte[] serializedConfig;
+
+       public AbstractTableInputFormat(org.apache.hadoop.conf.Configuration 
hConf) {
+               serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(hConf);
+       }
+
        /**
         * Returns an instance of Scan that retrieves the required subset of 
records from the HBase table.
         *
@@ -99,6 +108,10 @@ abstract class AbstractTableInputFormat<T> extends 
RichInputFormat<T, TableInput
         */
        public abstract void configure(Configuration parameters);
 
+       protected org.apache.hadoop.conf.Configuration getHadoopConfiguration() 
{
+               return 
HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, 
HBaseConfiguration.create());
+       }
+
        @Override
        public void open(TableInputSplit split) throws IOException {
                if (table == null) {
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java
index 972cf0e..45d5b3f 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -37,6 +36,15 @@ public abstract class HBaseInputFormat<T extends Tuple> 
extends AbstractTableInp
        private static final long serialVersionUID = 1L;
 
        /**
+        * Constructs a {@link InputFormat} with hbase configuration to read 
data from hbase.
+        * @param hConf The configuration that connect to hbase.
+        *              At least hbase.zookeeper.quorum and 
zookeeper.znode.parent need to be set.
+        */
+       public HBaseInputFormat(org.apache.hadoop.conf.Configuration hConf) {
+               super(hConf);
+       }
+
+       /**
         * Returns an instance of Scan that retrieves the required subset of 
records from the HBase table.
         * @return The appropriate instance of Scan for this usecase.
         */
@@ -79,8 +87,7 @@ public abstract class HBaseInputFormat<T extends Tuple> 
extends AbstractTableInp
         */
        private HTable createTable() {
                LOG.info("Initializing HBaseConfiguration");
-               //use files found in the classpath
-               org.apache.hadoop.conf.Configuration hConf = 
HBaseConfiguration.create();
+               org.apache.hadoop.conf.Configuration hConf = 
getHadoopConfiguration();
 
                try {
                        return new HTable(hConf, getTableName());
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java
index 5a628d9..30be6d8 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java
@@ -24,7 +24,6 @@ import org.apache.flink.connector.hbase.util.HBaseSerde;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.data.RowData;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Connection;
@@ -50,15 +49,13 @@ public class HBaseRowDataInputFormat extends 
AbstractTableInputFormat<RowData> {
 
        private transient HBaseSerde serde;
 
-       private transient org.apache.hadoop.conf.Configuration conf;
-
        public HBaseRowDataInputFormat(
                        org.apache.hadoop.conf.Configuration conf,
                        String tableName,
                        HBaseTableSchema schema,
                        String nullStringLiteral) {
+               super(conf);
                this.tableName = tableName;
-               this.conf = conf;
                this.schema = schema;
                this.nullStringLiteral = nullStringLiteral;
        }
@@ -89,13 +86,8 @@ public class HBaseRowDataInputFormat extends 
AbstractTableInputFormat<RowData> {
        }
 
        private void connectToTable() {
-
-               if (this.conf == null) {
-                       this.conf = HBaseConfiguration.create();
-               }
-
                try {
-                       Connection conn = 
ConnectionFactory.createConnection(conf);
+                       Connection conn = 
ConnectionFactory.createConnection(getHadoopConfiguration());
                        super.table = (HTable) 
conn.getTable(TableName.valueOf(tableName));
                } catch (TableNotFoundException tnfe) {
                        LOG.error("The table " + tableName + " not found ", 
tnfe);
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java
index f455298..f7100ed 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.connector.hbase.util.HBaseReadWriteHelper;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.types.Row;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Connection;
@@ -54,12 +53,11 @@ public class HBaseRowInputFormat extends 
AbstractTableInputFormat<Row> implement
        private final String tableName;
        private final HBaseTableSchema schema;
 
-       private transient org.apache.hadoop.conf.Configuration conf;
        private transient HBaseReadWriteHelper readHelper;
 
        public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, 
String tableName, HBaseTableSchema schema) {
+               super(conf);
                this.tableName = tableName;
-               this.conf = conf;
                this.schema = schema;
        }
 
@@ -90,13 +88,8 @@ public class HBaseRowInputFormat extends 
AbstractTableInputFormat<Row> implement
        }
 
        private void connectToTable() {
-
-               if (this.conf == null) {
-                       this.conf = HBaseConfiguration.create();
-               }
-
                try {
-                       Connection conn = 
ConnectionFactory.createConnection(conf);
+                       Connection conn = 
ConnectionFactory.createConnection(getHadoopConfiguration());
                        super.table = (HTable) 
conn.getTable(TableName.valueOf(tableName));
                } catch (TableNotFoundException tnfe) {
                        LOG.error("The table " + tableName + " not found ", 
tnfe);
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
index 2a1ffa2..7724873 100644
--- 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
@@ -54,8 +54,6 @@ import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -312,7 +310,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Tuple1<Integer>> result = env
-                       .createInput(new InputFormatForTestTable())
+                       .createInput(new InputFormatForTestTable(getConf()))
                        .reduce((ReduceFunction<Tuple1<Integer>>) (v1, v2) -> 
Tuple1.of(v1.f0 + v2.f0));
 
                List<Tuple1<Integer>> resultSet = result.collect();
@@ -612,9 +610,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
                properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
                properties.put(CONNECTOR_PROPERTY_VERSION, "1");
                properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
-               // get zk quorum from "hbase-site.xml" in classpath
-               String hbaseZk = 
HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
-               properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
+               properties.put(CONNECTOR_ZK_QUORUM, getZookeeperQuorum());
                // schema
                String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
                TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, 
Types.INT);
@@ -705,6 +701,10 @@ public class HBaseConnectorITCase extends HBaseTestBase {
        public static class InputFormatForTestTable extends 
HBaseInputFormat<Tuple1<Integer>> {
                private static final long serialVersionUID = 1L;
 
+               public 
InputFormatForTestTable(org.apache.hadoop.conf.Configuration hConf) {
+                       super(hConf);
+               }
+
                @Override
                protected Scan getScanner() {
                        return new Scan();
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java
 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java
index 13914e5..026c6d0 100644
--- 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java
+++ 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.hbase.source.HBaseInputFormat;
 
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -47,7 +48,7 @@ public class HBaseReadExample {
        public static void main(String[] args) throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                @SuppressWarnings("serial")
-               DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new 
HBaseInputFormat<Tuple2<String, String>>() {
+               DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new 
HBaseInputFormat<Tuple2<String, String>>(HBaseConfiguration.create()) {
 
                                @Override
                                public String getTableName() {
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
index 04555cc..e00b986 100644
--- 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
+++ 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
@@ -74,7 +74,6 @@ public abstract class HBaseTestBase extends 
HBaseTestingClusterAutoStarter {
 
        @BeforeClass
        public static void activateHBaseCluster() throws IOException {
-               registerHBaseMiniClusterInClasspath();
                prepareTables();
        }
 
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java
 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java
index 9e15a2f..4ba1c17 100644
--- 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java
+++ 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java
@@ -40,24 +40,13 @@ import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * By using this class as the super class of a set of tests you will have a 
HBase testing
@@ -83,8 +72,6 @@ public abstract class HBaseTestingClusterAutoStarter extends 
AbstractTestBase {
        private static HBaseAdmin admin = null;
        private static List<TableName> createdTables = new ArrayList<>();
 
-       private static boolean alreadyRegisteredTestCluster = false;
-
        private static Configuration conf;
 
        protected static void createTable(TableName tableName, byte[][] 
columnFamilyName, byte[][] splitKeys) {
@@ -157,6 +144,7 @@ public abstract class HBaseTestingClusterAutoStarter 
extends AbstractTestBase {
                TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 
-1);
 
                // Make sure the zookeeper quorum value contains the right port 
number (varies per run).
+               LOG.info("Hbase minicluster client port: " + 
TEST_UTIL.getZkCluster().getClientPort());
                TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", 
"localhost:" + TEST_UTIL.getZkCluster().getClientPort());
 
                conf = initialize(TEST_UTIL.getConfiguration());
@@ -170,77 +158,10 @@ public abstract class HBaseTestingClusterAutoStarter 
extends AbstractTestBase {
                return "localhost:" + TEST_UTIL.getZkCluster().getClientPort();
        }
 
-       private static File hbaseSiteXmlDirectory;
-       private static File hbaseSiteXmlFile;
-
-       /**
-        * This dynamically generates a hbase-site.xml file that is added to 
the classpath.
-        * This way this HBaseMinicluster can be used by an unmodified 
application.
-        * The downside is that this cannot be 'unloaded' so you can have only 
one per JVM.
-        */
-       public static void registerHBaseMiniClusterInClasspath() {
-               if (alreadyRegisteredTestCluster) {
-                       fail("You CANNOT register a second HBase Testing 
cluster in the classpath of the SAME JVM");
-               }
-               File baseDir = new File(System.getProperty("java.io.tmpdir", 
"/tmp/"));
-               hbaseSiteXmlDirectory = new File(baseDir, 
"unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/");
-
-               if (!hbaseSiteXmlDirectory.mkdirs()) {
-                       fail("Unable to create output directory " + 
hbaseSiteXmlDirectory + " for the HBase minicluster");
-               }
-
-               assertNotNull("The ZooKeeper for the HBase minicluster is 
missing", TEST_UTIL.getZkCluster());
-
-               createHBaseSiteXml(hbaseSiteXmlDirectory, 
TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum"));
-               addDirectoryToClassPath(hbaseSiteXmlDirectory);
-
-               // Avoid starting it again.
-               alreadyRegisteredTestCluster = true;
-       }
-
        public static Configuration getConf() {
                return conf;
        }
 
-       private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, 
String zookeeperQuorum) {
-               hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, 
"hbase-site.xml");
-               // Create the hbase-site.xml file for this run.
-               try {
-                       String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" +
-                               "<?xml-stylesheet type=\"text/xsl\" 
href=\"configuration.xsl\"?>\n" +
-                               "<configuration>\n" +
-                               "  <property>\n" +
-                               "    <name>hbase.zookeeper.quorum</name>\n" +
-                               "    <value>" + zookeeperQuorum + "</value>\n" +
-                               "  </property>\n" +
-                               "</configuration>";
-                       OutputStream fos = new 
FileOutputStream(hbaseSiteXmlFile);
-                       
fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8));
-                       fos.close();
-               } catch (IOException e) {
-                       fail("Unable to create " + hbaseSiteXmlFile);
-               }
-       }
-
-       private static void addDirectoryToClassPath(File directory) {
-               try {
-                       // Get the classloader actually used by 
HBaseConfiguration
-                       ClassLoader classLoader = 
HBaseConfiguration.create().getClassLoader();
-                       if (!(classLoader instanceof URLClassLoader)) {
-                               fail("We should get a URLClassLoader");
-                       }
-
-                       // Make the addURL method accessible
-                       Method method = 
URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
-                       method.setAccessible(true);
-
-                       // Add the directory where we put the hbase-site.xml to 
the classpath
-                       method.invoke(classLoader, directory.toURI().toURL());
-               } catch (MalformedURLException | NoSuchMethodException | 
IllegalAccessException | InvocationTargetException e) {
-                       fail("Unable to add " + directory + " to classpath 
because of this exception: " + e.getMessage());
-               }
-       }
-
        @AfterClass
        public static void tearDown() throws Exception {
                if (conf == null) {
@@ -249,8 +170,6 @@ public abstract class HBaseTestingClusterAutoStarter 
extends AbstractTestBase {
                }
                LOG.info("HBase minicluster: Shutting down");
                deleteTables();
-               hbaseSiteXmlFile.delete();
-               hbaseSiteXmlDirectory.delete();
                TEST_UTIL.shutdownMiniCluster();
                LOG.info("HBase minicluster: Down");
        }

Reply via email to