This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 649b7fe197c [FLINK-32731][e2e] Fix NameNode uses port 9870 in hadoop3 649b7fe197c is described below commit 649b7fe197c8b03cce9595adcfea33c8d708a8b4 Author: Shengkai <1059623...@qq.com> AuthorDate: Thu Sep 7 17:55:31 2023 +0800 [FLINK-32731][e2e] Fix NameNode uses port 9870 in hadoop3 This closes #23370 --- .../org/apache/flink/tests/hive/HiveITCase.java | 6 +- .../flink/tests/hive/containers/HiveContainer.java | 188 ---------------- .../tests/hive/containers/HiveContainers.java | 246 +++++++++++++++++++++ 3 files changed, 249 insertions(+), 191 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java index 5b310688f89..24a759887c5 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/HiveITCase.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.test.util.SQLJobSubmission; -import org.apache.flink.tests.hive.containers.HiveContainer; +import org.apache.flink.tests.hive.containers.HiveContainers; import org.apache.flink.tests.util.flink.ClusterController; import org.apache.flink.tests.util.flink.FlinkResource; import org.apache.flink.tests.util.flink.FlinkResourceSetup; @@ -69,8 +69,8 @@ public class HiveITCase extends TestLogger { @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); @ClassRule - public static final HiveContainer HIVE_CONTAINER = - new HiveContainer( + public static final HiveContainers.HiveContainer HIVE_CONTAINER = + HiveContainers.createHiveContainer( Arrays.asList("hive_sink1", "hive_sink2", "h_table_sink1", "h_table_sink2")); private static final String HIVE_ADD_ONE_UDF_CLASS = "HiveAddOneFunc"; diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainer.java b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainer.java deleted file mode 100644 index d4ca4080023..00000000000 --- a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainer.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.hive.containers; - -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; -import org.apache.flink.test.parameters.ParameterProperty; -import org.apache.flink.util.DockerImageVersions; - -import com.github.dockerjava.api.command.InspectContainerResponse; -import okhttp3.FormBody; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import org.junit.runner.Description; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.utility.DockerImageName; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import java.util.UUID; - -/** Test container for Hive. */ -public class HiveContainer extends GenericContainer<HiveContainer> { - - private static final Logger LOG = LoggerFactory.getLogger(HiveContainer.class); - public static final String HOST_NAME = "hadoop-master"; - public static final int HIVE_METASTORE_PORT = 9083; - - private static final boolean HIVE_310_OR_LATER = - HiveShimLoader.getHiveVersion().compareTo(HiveShimLoader.HIVE_VERSION_V3_1_0) >= 0; - - // Detailed log paths are from - // https://github.com/prestodb/docker-images/tree/master/prestodb/hdp2.6-hive/files/etc/supervisord.d - // https://github.com/prestodb/docker-images/blob/master/prestodb/hive3.1-hive/files/etc/supervisord.conf - private static final String NAME_NODE_LOG_PATH = - "/var/log/hadoop-hdfs/hadoop-hdfs-namenode.log"; - private static final String METASTORE_LOG_PATH = "/tmp/hive/hive.log"; - private static final String MYSQL_METASTORE_LOG_PATH = "/var/log/mysqld.log"; - private static final ParameterProperty<Path> DISTRIBUTION_LOG_BACKUP_DIRECTORY = - new ParameterProperty<>("logBackupDir", Paths::get); - private static final int NAME_NODE_WEB_PORT = 50070; - - private String hiveWarehouseDir; - - public HiveContainer(List<String> initTableNames) { - super( - DockerImageName.parse( - HIVE_310_OR_LATER ? DockerImageVersions.HIVE3 : DockerImageVersions.HIVE2)); - withExtraHost(HOST_NAME, "127.0.0.1"); - withStartupAttempts(3); - addExposedPort(HIVE_METASTORE_PORT); - addExposedPort(NAME_NODE_WEB_PORT); - mountHiveWarehouseDirToContainer(initTableNames); - } - - @Override - protected void doStart() { - super.doStart(); - if (LOG.isInfoEnabled()) { - followOutput(new Slf4jLogConsumer(LOG)); - } - } - - @Override - protected void finished(Description description) { - backupLogs(); - super.finished(description); - } - - @Override - protected void containerIsStarted(InspectContainerResponse containerInfo) { - super.containerIsStarted(containerInfo); - final Request request = - new Request.Builder() - .post(new FormBody.Builder().build()) - .url( - String.format( - "http://127.0.0.1:%s", getMappedPort(NAME_NODE_WEB_PORT))) - .build(); - OkHttpClient client = new OkHttpClient(); - try (Response response = client.newCall(request).execute()) { - if (!response.isSuccessful()) { - throw new RuntimeException( - String.format( - "The rest request is not successful: %s", response.message())); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public String getHiveMetastoreURL() { - return String.format("thrift://%s:%s", getHost(), getMappedPort(HIVE_METASTORE_PORT)); - } - - public String getWarehousePath() { - return hiveWarehouseDir; - } - - private void mountHiveWarehouseDirToContainer(List<String> initTableNames) { - try { - Path warehousePath = Files.createTempDirectory("hive_warehouse"); - File file = warehousePath.toFile(); - setFilePermission(file); - hiveWarehouseDir = file.getAbsolutePath(); - LOG.info("mountHiveWarehouseDirToContainer: " + hiveWarehouseDir); - - if (HIVE_310_OR_LATER) { - // if it's hive 3.1+, we should first create the dir for the table, - // and set it readable & writable by all, otherwise, it'll throw - // permission denied exception when try to write the tables - for (String tableName : initTableNames) { - file = Files.createDirectory(warehousePath.resolve(tableName)).toFile(); - setFilePermission(file); - } - } - - withFileSystemBind( - warehousePath.toAbsolutePath().toString(), - warehousePath.toAbsolutePath().toString()); - } catch (IOException e) { - throw new IllegalStateException("Failed to create warehouse directory", e); - } - } - - private void setFilePermission(File file) { - file.setReadable(true, false); - file.setWritable(true, false); - file.setExecutable(true, false); - } - - private void backupLogs() { - Path path = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get().orElse(null); - if (path == null) { - LOG.warn( - "Property {} not set, logs will not be backed up.", - DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName()); - return; - } - try { - Path dir = - Files.createDirectory( - Paths.get( - String.valueOf(path.toAbsolutePath()), - "hive-" + UUID.randomUUID())); - copyFileFromContainer( - NAME_NODE_LOG_PATH, - Files.createFile(Paths.get(dir.toAbsolutePath().toString(), "namenode.log")) - .toAbsolutePath() - .toString()); - copyFileFromContainer( - METASTORE_LOG_PATH, - Files.createFile(Paths.get(dir.toAbsolutePath().toString(), "metastore.log")) - .toAbsolutePath() - .toString()); - copyFileFromContainer( - MYSQL_METASTORE_LOG_PATH, - Files.createFile(Paths.get(dir.toAbsolutePath().toString(), "mysql.log")) - .toAbsolutePath() - .toString()); - } catch (Throwable e) { - LOG.warn("Failed to backup logs...", e); - } - } -} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainers.java b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainers.java new file mode 100644 index 00000000000..829612d31dc --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-hive/src/test/java/org/apache/flink/tests/hive/containers/HiveContainers.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.hive.containers; + +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.test.parameters.ParameterProperty; +import org.apache.flink.util.DockerImageVersions; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import okhttp3.FormBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** Factory to create the {@link HiveContainer}. */ +public class HiveContainers { + + public static final String HOST_NAME = "hadoop-master"; + public static final int HIVE_METASTORE_PORT = 9083; + + private static final boolean HIVE_310_OR_LATER = + HiveShimLoader.getHiveVersion().compareTo(HiveShimLoader.HIVE_VERSION_V3_1_0) >= 0; + private static final ParameterProperty<Path> DISTRIBUTION_LOG_BACKUP_DIRECTORY = + new ParameterProperty<>("logBackupDir", Paths::get); + + public static HiveContainer createHiveContainer(List<String> initTableNames) { + if (HIVE_310_OR_LATER) { + return new Hive3Container(initTableNames); + } else { + return new Hive2Container(initTableNames); + } + } + + /** Test container for Hive. */ + public abstract static class HiveContainer extends GenericContainer<HiveContainer> { + + private static final Logger LOG = LoggerFactory.getLogger(HiveContainer.class); + protected String hiveWarehouseDir; + + public HiveContainer(String imageName, List<String> initTableNames) { + super(DockerImageName.parse(imageName)); + withExtraHost(HOST_NAME, "127.0.0.1"); + withStartupAttempts(3); + addExposedPort(HIVE_METASTORE_PORT); + addExposedPort(getNameNodeWebEndpointPort()); + mountHiveWarehouseDirToContainer(initTableNames); + } + + @Override + protected void doStart() { + super.doStart(); + if (LOG.isInfoEnabled()) { + followOutput(new Slf4jLogConsumer(LOG)); + } + } + + @Override + protected void finished(Description description) { + backupLogs(); + super.finished(description); + } + + @Override + protected void containerIsStarted(InspectContainerResponse containerInfo) { + super.containerIsStarted(containerInfo); + final Request request = + new Request.Builder() + .post(new FormBody.Builder().build()) + .url( + String.format( + "http://127.0.0.1:%s", + getMappedPort(getNameNodeWebEndpointPort()))) + .build(); + OkHttpClient client = new OkHttpClient(); + try (Response response = client.newCall(request).execute()) { + if (!response.isSuccessful()) { + throw new RuntimeException( + String.format( + "The rest request is not successful: %s", response.message())); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public String getHiveMetastoreURL() { + return String.format("thrift://%s:%s", getHost(), getMappedPort(HIVE_METASTORE_PORT)); + } + + public String getWarehousePath() { + return hiveWarehouseDir; + } + + private void mountHiveWarehouseDirToContainer(List<String> initTableNames) { + try { + Path warehousePath = Files.createTempDirectory("hive_warehouse"); + File file = warehousePath.toFile(); + setFilePermission(file); + hiveWarehouseDir = file.getAbsolutePath(); + LOG.info("mountHiveWarehouseDirToContainer: " + hiveWarehouseDir); + + mount(initTableNames); + } catch (IOException e) { + throw new IllegalStateException("Failed to create warehouse directory", e); + } + } + + protected void setFilePermission(File file) { + file.setReadable(true, false); + file.setWritable(true, false); + file.setExecutable(true, false); + } + + private void backupLogs() { + Path path = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get().orElse(null); + if (path == null) { + LOG.warn( + "Property {} not set, logs will not be backed up.", + DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName()); + return; + } + try { + Path dir = + Files.createDirectory( + Paths.get( + String.valueOf(path.toAbsolutePath()), + "hive-" + UUID.randomUUID())); + + for (String filePathInDocker : getBackupLogPaths()) { + copyFileFromContainer( + filePathInDocker, + dir.resolve(Paths.get(filePathInDocker).getFileName()).toString()); + } + } catch (Throwable e) { + LOG.warn("Failed to backup logs...", e); + } + } + + protected abstract List<String> getBackupLogPaths(); + + protected abstract void mount(List<String> initTableNames) throws IOException; + + protected abstract int getNameNodeWebEndpointPort(); + } + + private static class Hive2Container extends HiveContainer { + + private static final String METASTORE_LOG_PATH = "/tmp/hive/hive.log"; + private static final String MYSQL_METASTORE_LOG_PATH = "/var/log/mysqld.log"; + private static final String NAME_NODE_LOG_PATH = + "/var/log/hadoop-hdfs/hadoop-hdfs-namenode.log"; + private static final int NAME_NODE_WEB_PORT = 50070; + + public Hive2Container(List<String> initTableNames) { + super(DockerImageVersions.HIVE2, initTableNames); + } + + @Override + protected List<String> getBackupLogPaths() { + // Detailed log paths are from + // https://github.com/prestodb/docker-images/tree/master/prestodb/hdp2.6-hive/files/etc/supervisord.d + return Arrays.asList(METASTORE_LOG_PATH, MYSQL_METASTORE_LOG_PATH, NAME_NODE_LOG_PATH); + } + + @Override + protected void mount(List<String> initTableNames) { + withFileSystemBind(hiveWarehouseDir, hiveWarehouseDir); + } + + @Override + protected int getNameNodeWebEndpointPort() { + return NAME_NODE_WEB_PORT; + } + } + + private static class Hive3Container extends HiveContainer { + + private static final String METASTORE_LOG_PATH = "/tmp/root/hive.log"; + private static final String MARIADB_METASTORE_LOG_PATH = "/var/log/mariadb/mariadb.log"; + private static final String NAME_NODE_LOG_PATH = + "/var/log/hadoop-hdfs/hadoop-hdfs-namenode.log"; + private static final int NAME_NODE_WEB_PORT = 9870; + + public Hive3Container(List<String> initTableNames) { + super(DockerImageVersions.HIVE3, initTableNames); + } + + @Override + protected List<String> getBackupLogPaths() { + // Detailed log paths are from + // https://github.com/prestodb/docker-images/blob/master/prestodb/hive3.1-hive/files/etc/supervisord.conf + return Arrays.asList( + METASTORE_LOG_PATH, MARIADB_METASTORE_LOG_PATH, NAME_NODE_LOG_PATH); + } + + @Override + protected void mount(List<String> initTableNames) throws IOException { + // we should first create the dir for the table, + // and set it readable & writable by all, otherwise, it'll throw + // permission denied exception when try to write the tables + for (String tableName : initTableNames) { + File file = + Files.createDirectory(Paths.get(hiveWarehouseDir).resolve(tableName)) + .toFile(); + setFilePermission(file); + } + withFileSystemBind(hiveWarehouseDir, hiveWarehouseDir); + } + + @Override + protected int getNameNodeWebEndpointPort() { + return NAME_NODE_WEB_PORT; + } + } +}