[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r806086781 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java ## @@ -52,7 +52,18 @@ */ public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore { -private static final String DIRTY_SUFFIX = "_DIRTY"; +@VisibleForTesting static final String FILE_EXTENSION = ".json"; +@VisibleForTesting static final String DIRTY_FILE_EXTENSION = "_DIRTY" + FILE_EXTENSION; + +@VisibleForTesting +public static boolean hasValidDirtyJobResultStoreEntryExtension(String filename) { +return filename.endsWith(DIRTY_FILE_EXTENSION); +} + +@VisibleForTesting +public static boolean hasValidJobResultStoreEntryExtension(String filename) { +return filename.endsWith(FILE_EXTENSION); Review comment: The constants I use in the `FileSystemJobResultStoreTest` as well. But I kept them `package-private`. The `hasValid*` can be used in other places like the `HAJobRunOnMinioS3StoreITCase` and are, therefore, `public` ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides {@code abstract} + * methods for initializing a specific {@link FileSystem}. + */ +@ExtendWith(TestLoggerExtension.class) +public abstract class AbstractHaJobRunITCase { + +@RegisterExtension +private static final AllCallbackWrapper ZOOKEEPER_EXTENSION = +new AllCallbackWrapper<>(new ZooKeeperExtension()); + +@RegisterExtension +public final EachCallbackWrapper miniClusterExtension = +new EachCallbackWrapper<>( +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(1) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) +.setConfiguration(getFlinkConfiguration()) +.build())); + +private Configuration getFlinkConfiguration() { +Configuration config = new Configuration(); +config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); +config.set( +HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, +ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString()); + +config.set(HighAvailabilityOptions.HA_S
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r806857666 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java ## @@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException { @Override public Set getDirtyResultsInternal() throws IOException { final Set dirtyResults = new HashSet<>(); -final FileStatus fs = fileSystem.getFileStatus(this.basePath); -if (fs.isDir()) { -FileStatus[] statuses = fileSystem.listStatus(this.basePath); Review comment: To clarify: I looked through the code once more. The `PrestoS3FileSystem` does not support `getFileStatus` on empty directories. You're right when you said that the `mkdir` call doesn't create anything (see [PrestoS3FileSystem:520](https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java#L520)). But the `getFileStatus` method tries to get the `FileStatus` of the object at the given path. If that does not exist, it will look for objects having the path as a prefix (through `listObject`). A `FileNotFoundException` is thrown if no objects live underneath the passed path (which corresponds to an empty directory, see [PrestoS3FileSystem:361](https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java#L361)). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804187457 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java ## @@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException { @Override public Set getDirtyResultsInternal() throws IOException { final Set dirtyResults = new HashSet<>(); -final FileStatus fs = fileSystem.getFileStatus(this.basePath); -if (fs.isDir()) { -FileStatus[] statuses = fileSystem.listStatus(this.basePath); Review comment: The presto test still fails with this change being reverted. `PrestoS3FileSystem` does not support `getFileStatus` on empty directories. Removing the `isDir` check fixes that issue. I do another round over the presto implementation tomorrow to understand why it is. FLINK-26061 covers the difference between presto and hadoop s3 fs *updated the comment to make more sense out of it* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r806121339 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides {@code abstract} + * methods for initializing a specific {@link FileSystem}. + */ +@ExtendWith(TestLoggerExtension.class) +public abstract class AbstractHaJobRunITCase { + +@RegisterExtension +private static final AllCallbackWrapper ZOOKEEPER_EXTENSION = +new AllCallbackWrapper<>(new ZooKeeperExtension()); + +@RegisterExtension +public final EachCallbackWrapper miniClusterExtension = +new EachCallbackWrapper<>( +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(1) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) +.setConfiguration(getFlinkConfiguration()) +.build())); + +private Configuration getFlinkConfiguration() { +Configuration config = new Configuration(); +config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); +config.set( +HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, +ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString()); + +config.set(HighAvailabilityOptions.HA_STORAGE_PATH, createHAStoragePath()); + +updateConfiguration(config); + +FileSystem.initialize(config, loadPluginManager()); +initializeStorageBackend(config); + +return config; +} + +/** + * Should return the path to the HA storage which will be injected into the Flink configuration. + * + * @see HighAvailabilityOptions#HA_STORAGE_PATH + */ +protected abstract String createHAStoragePath(); + +/** + * Updates the passed {@link Configuration} to point to the {@link FileSystem} that's subject to + * test. + */ +protected abstract void updateConfiguration(Configuration config); + +/** Runs any additional initialization that are necessary before running the actual test. */ +protected void initializeStorageBackend(Configuration config) {} + +@Nullable +protected PluginManager loadPluginManager() { +// by default, no PluginManager is loaded +return null; +} + +@Test +public void testJobExecutionInHaMode() throws E
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r806086781 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java ## @@ -52,7 +52,18 @@ */ public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore { -private static final String DIRTY_SUFFIX = "_DIRTY"; +@VisibleForTesting static final String FILE_EXTENSION = ".json"; +@VisibleForTesting static final String DIRTY_FILE_EXTENSION = "_DIRTY" + FILE_EXTENSION; + +@VisibleForTesting +public static boolean hasValidDirtyJobResultStoreEntryExtension(String filename) { +return filename.endsWith(DIRTY_FILE_EXTENSION); +} + +@VisibleForTesting +public static boolean hasValidJobResultStoreEntryExtension(String filename) { +return filename.endsWith(FILE_EXTENSION); Review comment: The constants I use in the `FileSystemJobResultStoreTest` as well. But I kept them `package-private`. The `hasValid*` can be used in other places like the `HAJobRunOnMinioS3StoreITCase` and are, therefore, `public` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r805876545 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides {@code abstract} + * methods for initializing a specific {@link FileSystem}. + */ +@ExtendWith(TestLoggerExtension.class) +public abstract class AbstractHaJobRunITCase { + +@RegisterExtension +private static final AllCallbackWrapper ZOOKEEPER_EXTENSION = +new AllCallbackWrapper<>(new ZooKeeperExtension()); + +@RegisterExtension +public final EachCallbackWrapper miniClusterExtension = +new EachCallbackWrapper<>( +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(1) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) +.setConfiguration(getFlinkConfiguration()) +.build())); + +private Configuration getFlinkConfiguration() { +Configuration config = new Configuration(); +config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); +config.set( +HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, +ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString()); + +config.set(HighAvailabilityOptions.HA_STORAGE_PATH, createHAStoragePath()); + +updateConfiguration(config); + +FileSystem.initialize(config, loadPluginManager()); +initializeStorageBackend(config); + +return config; +} + +/** + * Should return the path to the HA storage which will be injected into the Flink configuration. + * + * @see HighAvailabilityOptions#HA_STORAGE_PATH + */ +protected abstract String createHAStoragePath(); + +/** + * Updates the passed {@link Configuration} to point to the {@link FileSystem} that's subject to + * test. + */ +protected abstract void updateConfiguration(Configuration config); + +/** Runs any additional initialization that are necessary before running the actual test. */ +protected void initializeStorageBackend(Configuration config) {} + +@Nullable +protected PluginManager loadPluginManager() { +// by default, no PluginManager is loaded +return null; +} + +@Test +public void testJobExecutionInHaMode() throws E
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r805874471 ## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java ## @@ -45,4 +45,6 @@ public static final String PULSAR = "apachepulsar/pulsar:2.8.0"; public static final String CASSANDRA_3 = "cassandra:3.0"; + +public static final String MINIO = "minio/minio:edge"; Review comment: Debugging the behavior from the Flink side brings me to the conclusion that it's caused by the response coming from a `listObjects` call. It looks like [Minio PR #13804](https://github.com/minio/minio/pull/13804) is the fix that was added in `RELEASE.2021-12-09T06-19-41Z` that's relevant for us here. The PR description also points to `S3AFileSystem` "utilizing the behavior that is fixed in that PR". I leave it like that without verifying it through a custom build to not put more time into that investigation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r805792824 ## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java ## @@ -45,4 +45,6 @@ public static final String PULSAR = "apachepulsar/pulsar:2.8.0"; public static final String CASSANDRA_3 = "cassandra:3.0"; + +public static final String MINIO = "minio/minio:edge"; Review comment: Yes, it looks like it the hadoop S3 FS issue on the Flink side was caused by Minio... I'm trying to understand a bit better still digging through the diff on the minio side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r805792824 ## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java ## @@ -45,4 +45,6 @@ public static final String PULSAR = "apachepulsar/pulsar:2.8.0"; public static final String CASSANDRA_3 = "cassandra:3.0"; + +public static final String MINIO = "minio/minio:edge"; Review comment: Yes, it looks like it the hadoop issue was caused by Minio... I'm trying to understand a bit better still digging through the diff on the minio side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r805781523 ## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java ## @@ -45,4 +45,6 @@ public static final String PULSAR = "apachepulsar/pulsar:2.8.0"; public static final String CASSANDRA_3 = "cassandra:3.0"; + +public static final String MINIO = "minio/minio:edge"; Review comment: I looked into the versioning again. `edge` seems to be a development version: ``` $ docker run --entrypoint="" \ -p 9000:9000 \ -p 9001:9001 \ -v /tmp/minio/data2:/data \ -e "MINIO_ROOT_USER=minio" \ -e "MINIO_ROOT_PASSWORD=minio123" \ -it minio/minio:edge /opt/bin/minio --version minio version DEVELOPMENT.2021-11-23T00-07-23Z ``` It includes a bug that causes the `getFileStatus` to fail on an empty directory to fail. The observed behavior is also present in the released version `RELEASE.2021-11-24T23-19-33Z` but fixed in the subsequent version `RELEASE.2021-12-09T06-19-41Z` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r805781523 ## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java ## @@ -45,4 +45,6 @@ public static final String PULSAR = "apachepulsar/pulsar:2.8.0"; public static final String CASSANDRA_3 = "cassandra:3.0"; + +public static final String MINIO = "minio/minio:edge"; Review comment: I looked into the versioning again. `edge` seems to be a development version: ``` $ docker run --entrypoint="" \ -p 9000:9000 \ -p 9001:9001 \ -v /tmp/minio/data2:/data \ -e "MINIO_ROOT_USER=minio" \ -e "MINIO_ROOT_PASSWORD=minio123" \ -it minio/minio:edge /opt/bin/minio --version minio version DEVELOPMENT.2021-11-23T00-07-23Z ``` It includes a bug that causes the `getFileStatus` to fail on an empty directory to fail. The observed behavior is also present in the released version `RELEASE.2021-11-24T23-19-33Z` but fixed in the subsequent version `RELEASE.2021-12-10T23-03-39Z` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r805781523 ## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java ## @@ -45,4 +45,6 @@ public static final String PULSAR = "apachepulsar/pulsar:2.8.0"; public static final String CASSANDRA_3 = "cassandra:3.0"; + +public static final String MINIO = "minio/minio:edge"; Review comment: I looked into the versioning again. `edge` seems to be a development version: ``` docker run --entrypoint="" \ -p 9000:9000 \ -p 9001:9001 \ -v /tmp/minio/data2:/data \ -e "MINIO_ROOT_USER=minio" \ -e "MINIO_ROOT_PASSWORD=minio123" \ -it minio/minio:edge /opt/bin/minio --version minio version DEVELOPMENT.2021-11-23T00-07-23Z ``` It includes a bug that causes the `getFileStatus` to fail on an empty directory to fail. The observed behavior is also present in the released version `RELEASE.2021-11-24T23-19-33Z` but fixed in the subsequent version `RELEASE.2021-12-10T23-03-39Z` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r805781523 ## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java ## @@ -45,4 +45,6 @@ public static final String PULSAR = "apachepulsar/pulsar:2.8.0"; public static final String CASSANDRA_3 = "cassandra:3.0"; + +public static final String MINIO = "minio/minio:edge"; Review comment: I looked into the versioning again. `edge` seems to be a development version: ``` docker run --entrypoint="" \ -p 9000:9000 \ -p 9001:9001 \ -v /tmp/minio/data2:/data \ -e "MINIO_ROOT_USER=minio" \ -e "MINIO_ROOT_PASSWORD=minio123" \ -it minio/minio:edge /opt/bin/minio --version minio version DEVELOPMENT.2021-11-23T00-07-23Z ``` It includes a bug that causes the `getFileStatus` to fail on an empty directory to fail. The observed behavior is also present in the released version `RELEASE.2021-11-24T23-19-33Z` but fixed in the succeeding version `RELEASE.2021-12-10T23-03-39Z` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804570972 ## File path: flink-filesystems/flink-s3-fs-base/pom.xml ## @@ -235,6 +265,27 @@ under the License. true + + + org.apache.maven.plugins + maven-jar-plugin + true + + + + true + true + + + Review comment: Accidentally added through blind copy&paste from `flink-runtime` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804518375 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides {@code abstract} + * methods for initializing a specific {@link FileSystem}. + */ +@ExtendWith(TestLoggerExtension.class) +public abstract class AbstractHaJobRunITCase { + +@RegisterExtension +private static final AllCallbackWrapper ZOOKEEPER_EXTENSION = +new AllCallbackWrapper<>(new ZooKeeperExtension()); + +@RegisterExtension +public final EachCallbackWrapper miniClusterExtension = +new EachCallbackWrapper<>( +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(1) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) Review comment: You're right. I did a quick test. I removed everything except for the Flink configuration. Initially, I added those lines because I wanted to check whether there are other ways to verify that the MiniCluster is running. ...related to [my comment above](https://github.com/apache/flink/pull/18692#discussion_r803481934) about errors happening during the initialization of the cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804498412 ## File path: flink-filesystems/flink-s3-fs-base/pom.xml ## @@ -221,6 +221,36 @@ under the License. + + + Review comment: I tried to introduce `` in `flink-s3-fs-base` instead. But these dependencies haven't been picked up by the submodules `flink-s3-fs-presto` and `flink-s3-fs-hadoop` (Maven was still complaing about the missing version). I reverted that change... @zentol should this generally work and I just did something and should look into it once more? Or is it just not possible in this use case to utilize ``? 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804498412 ## File path: flink-filesystems/flink-s3-fs-base/pom.xml ## @@ -221,6 +221,36 @@ under the License. + + + Review comment: I tried to introduce `` in `flink-s3-fs-base` instead. But they haven't been picked up by the submodules `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. @zentol should this generally work and I just did something and should look into it once more? Or is it just not possible in this use case to utilize ``? 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804187457 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java ## @@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException { @Override public Set getDirtyResultsInternal() throws IOException { final Set dirtyResults = new HashSet<>(); -final FileStatus fs = fileSystem.getFileStatus(this.basePath); -if (fs.isDir()) { -FileStatus[] statuses = fileSystem.listStatus(this.basePath); Review comment: The presto test still fails with this change being reverted. `PrestoS3FileSystem` does not call `getFileStatus`. Removing the `isDir` check fixes that issue. I do another round over the presto implementation tomorrow to understand why it is. FLINK-26061 covers the difference between presto and hadoop s3 fs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804199654 ## File path: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ## @@ -588,8 +588,10 @@ public static URI getDefaultFsUri() { * * @param f The path we want information from * @return a FileStatus object - * @throws FileNotFoundException when the path does not exist; IOException see specific - * implementation + * @throws FileNotFoundException when the path does not exist. This also applies to cases where + * the {@code FileSystem} implementation is based on an object store like S3 and the passed + * {@code Path} refers to a directory (even if it was created through {@link #mkdirs(Path)}; Review comment: You're right. I reverted the JavaDoc changes entirely because I feel that it adds more confusion than it helps. FLINK-26061 is still valid, though. ## File path: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ## @@ -684,7 +686,9 @@ public boolean exists(final Path f) throws IOException { /** * Make the given file and all non-existent parents into directories. Has the semantics of Unix - * 'mkdir -p'. Existence of the directory hierarchy is not an error. + * 'mkdir -p'. Existence of the directory hierarchy is not an error. {@code FileSystem} + * implementations being backed by an object store like S3 do not support directories. No + * entities are going to be created in that case. Review comment: You're right. I reverted the JavaDoc changes entirely because I feel that it adds more confusion than it helps. FLINK-26061 is still valid, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804187457 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java ## @@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException { @Override public Set getDirtyResultsInternal() throws IOException { final Set dirtyResults = new HashSet<>(); -final FileStatus fs = fileSystem.getFileStatus(this.basePath); -if (fs.isDir()) { -FileStatus[] statuses = fileSystem.listStatus(this.basePath); Review comment: The presto test still fails with this change being reverted. `PrestoS3FileSystem` does not call `getFileStatus`. Removing the `isDir` check fixes that issue. I do another round over the presto implementation tomorrow to understand why it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804187457 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java ## @@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException { @Override public Set getDirtyResultsInternal() throws IOException { final Set dirtyResults = new HashSet<>(); -final FileStatus fs = fileSystem.getFileStatus(this.basePath); -if (fs.isDir()) { -FileStatus[] statuses = fileSystem.listStatus(this.basePath); Review comment: The presto test still fails with this change being reverted. `PrestoS3FileSystem` does not call `getFileStatus` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804182029 ## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java ## @@ -45,4 +45,6 @@ public static final String PULSAR = "apachepulsar/pulsar:2.8.0"; public static final String CASSANDRA_3 = "cassandra:3.0"; + +public static final String MINIO = "minio/minio:edge"; Review comment: `minio/minio:edge` seems to be an old image version which caused weird errors with the hadoop s3 filesystem. I updated the tag to the most recent release which resolved the issue (@rmetzger used this old image in his test as well). That explains why we observed the same error with the s3 hadoop file system. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804179821 ## File path: flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3presto; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.github.dockerjava.api.command.InspectContainerResponse; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.Base58; + +import java.time.Duration; +import java.util.Locale; + +/** + * {@code MinioTestContainer} provides a {@code Minio} test instance. This code is based on the work + * done by {@code alexkirnsu/minio-testcontainer}. + */ +public class MinioTestContainer extends GenericContainer { + +private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint"; + +private static final int DEFAULT_PORT = 9000; + +private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER"; +private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD"; + +private static final String DEFAULT_STORAGE_DIRECTORY = "/data"; +private static final String HEALTH_ENDPOINT = "/minio/health/ready"; + +private final String accessKey; +private final String secretKey; +private final String defaultBucketName; + +public MinioTestContainer() { +this(randomString("bucket", 6)); +} + +public MinioTestContainer(String defaultBucketName) { +super(DockerImageVersions.MINIO); + +this.accessKey = randomString("accessKey", 10); +// secrets must have at least 8 characters +this.secretKey = randomString("secret", 10); +this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName); + +withNetworkAliases(randomString("minio", 6)); +addExposedPort(DEFAULT_PORT); +withEnv(MINIO_ACCESS_KEY, this.accessKey); +withEnv(MINIO_SECRET_KEY, this.secretKey); +withCommand("server", DEFAULT_STORAGE_DIRECTORY); +setWaitStrategy( +new HttpWaitStrategy() +.forPort(DEFAULT_PORT) +.forPath(HEALTH_ENDPOINT) +.withStartupTimeout(Duration.ofMinutes(2))); +} + +@Override +protected void containerIsStarted(InspectContainerResponse containerInfo) { +super.containerIsStarted(containerInfo); +createDefaultBucket(); +} + +private static String randomString(String prefix, int length) { +return String.format("%s-%s", prefix, Base58.randomString(length).toLowerCase(Locale.ROOT)); +} + +/** Creates {@link AmazonS3} client for accessing the {@code Minio} instance. */ +public AmazonS3 getClient() { +return AmazonS3Client.builder() +.withCredentials( +new AWSStaticCredentialsProvider( +new BasicAWSCredentials(accessKey, secretKey))) +.withPathStyleAccessEnabled(true) +.withEndpointConfiguration( +new AwsClientBuilder.EndpointConfiguration( +getHttpEndpoint(), "unused-region")) +.build(); +} + +private String getHttpEndpoint() { +return String.format("http://%s:%s";, getContainerIpAddress(), getMappedPort(DEFAULT_PORT)); +} + +/** + * Initializes the Minio instance (i.e. creating the default bucket and initializing Flink's + * FileSystems). Additionally, the passed Flink {@link Configuration} is extended by all + * relevant parameter to access the {@code Minio} instance
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804177584 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides {@code abstract} + * methods for initializing a specific {@link FileSystem}. + */ +@ExtendWith(TestLoggerExtension.class) +public abstract class AbstractHaJobRunITCase { + +@RegisterExtension +private static final AllCallbackWrapper ZOOKEEPER_EXTENSION = +new AllCallbackWrapper<>(new ZooKeeperExtension()); + +@RegisterExtension +public final EachCallbackWrapper miniClusterExtension = +new EachCallbackWrapper<>( +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(1) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) +.setConfiguration(getFlinkConfiguration()) +.build())); + +private Configuration getFlinkConfiguration() { +Configuration config = new Configuration(); +config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); +config.set( +HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, +ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString()); + +config.set(HighAvailabilityOptions.HA_STORAGE_PATH, createHAStoragePath()); + +updateConfiguration(config); + +FileSystem.initialize(config, loadPluginManager()); +initializeStorageBackend(config); + +return config; +} + +/** + * Should return the path to the HA storage which will be injected into the Flink configuration. + * + * @see HighAvailabilityOptions#HA_STORAGE_PATH + */ +protected abstract String createHAStoragePath(); + +/** + * Updates the passed {@link Configuration} to point to the {@link FileSystem} that's subject to + * test. + */ +protected abstract void updateConfiguration(Configuration config); + +/** Runs any additional initialization that are necessary before running the actual test. */ +protected void initializeStorageBackend(Configuration config) {} Review comment: Not really. I removed it. Additionally, I replaced the `updateConfiguration` by a `createConfliguration` since it's a cleaner interface method. -- This is an automated message from t
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804176021 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides {@code abstract} + * methods for initializing a specific {@link FileSystem}. + */ +@ExtendWith(TestLoggerExtension.class) +public abstract class AbstractHaJobRunITCase { + +@RegisterExtension +private static final AllCallbackWrapper ZOOKEEPER_EXTENSION = +new AllCallbackWrapper<>(new ZooKeeperExtension()); + +@RegisterExtension +public final EachCallbackWrapper miniClusterExtension = +new EachCallbackWrapper<>( +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(1) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) +.setConfiguration(getFlinkConfiguration()) +.build())); + +private Configuration getFlinkConfiguration() { +Configuration config = new Configuration(); +config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); +config.set( +HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, +ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString()); + +config.set(HighAvailabilityOptions.HA_STORAGE_PATH, createHAStoragePath()); + +updateConfiguration(config); + +FileSystem.initialize(config, loadPluginManager()); +initializeStorageBackend(config); + +return config; +} + +/** + * Should return the path to the HA storage which will be injected into the Flink configuration. + * + * @see HighAvailabilityOptions#HA_STORAGE_PATH + */ +protected abstract String createHAStoragePath(); + +/** + * Updates the passed {@link Configuration} to point to the {@link FileSystem} that's subject to + * test. + */ +protected abstract void updateConfiguration(Configuration config); + +/** Runs any additional initialization that are necessary before running the actual test. */ +protected void initializeStorageBackend(Configuration config) {} + +@Nullable +protected PluginManager loadPluginManager() { Review comment: there's not really a benefit. is removed again 👍 -- This is an automated message from the Apache Git Service. To re
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804175066 ## File path: flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainerTest.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3presto; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.core.testutils.TestContainerExtension; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.Bucket; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * {@code MinioTestContainerTest} tests some basic functionality provided by {@link + * MinioTestContainer}. + */ +public class MinioTestContainerTest { + +private static final String DEFAULT_BUCKET_NAME = "test-bucket"; + +@RegisterExtension +private static final EachCallbackWrapper> +MINIO_EXTENSION = +new EachCallbackWrapper<>( +new TestContainerExtension<>( +() -> new MinioTestContainer(DEFAULT_BUCKET_NAME))); + +private static MinioTestContainer getTestContainer() { +return MINIO_EXTENSION.getCustomExtension().getTestContainer(); +} + +private static AmazonS3 getClient() { +return getTestContainer().getClient(); +} + +@Test +public void testBucketCreation() { +final String otherBucketName = "other-bucket"; +final Bucket otherBucket = getClient().createBucket(otherBucketName); + +assertThat(otherBucket).isNotNull(); + assertThat(otherBucket).extracting(Bucket::getName).isEqualTo(otherBucketName); + +assertThat(getClient().listBuckets()) +.map(Bucket::getName) +.containsExactlyInAnyOrder( +getTestContainer().getDefaultBucketName(), otherBucketName); +} + +@Test +public void testPutObject() throws IOException { +final String bucketName = "other-bucket"; + +getClient().createBucket(bucketName); +final String objectId = "test-object"; +final String content = "test content"; +getClient().putObject(bucketName, objectId, content); + +final BufferedReader reader = +new BufferedReader( +new InputStreamReader( +getClient().getObject(bucketName, objectId).getObjectContent())); +assertThat(reader.readLine()).isEqualTo(content); +} + +@Test +public void testFlinkConfigurationInitialization() { +final Configuration config = new Configuration(); +getTestContainer().updateConfigAccordingly(config); + +assertThat(config.containsKey("s3.endpoint")).isTrue(); +assertThat(config.containsKey("s3.path.style.access")).isTrue(); +assertThat(config.containsKey("s3.access.key")).isTrue(); +assertThat(config.containsKey("s3.secret.key")).isTrue(); +} + +@Test +public void testDefaultBucketCreation() { +final Configuration config = new Configuration(); +getTestContainer().updateConfigAccordingly(config); Review comment: not required. correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804168911 ## File path: flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3presto; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.github.dockerjava.api.command.InspectContainerResponse; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.Base58; + +import java.time.Duration; +import java.util.Locale; + +/** + * {@code MinioTestContainer} provides a {@code Minio} test instance. This code is based on the work + * done by {@code alexkirnsu/minio-testcontainer}. + */ +public class MinioTestContainer extends GenericContainer { + +private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint"; + +private static final int DEFAULT_PORT = 9000; + +private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER"; +private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD"; + +private static final String DEFAULT_STORAGE_DIRECTORY = "/data"; +private static final String HEALTH_ENDPOINT = "/minio/health/ready"; + +private final String accessKey; +private final String secretKey; +private final String defaultBucketName; + +public MinioTestContainer() { +this(randomString("bucket", 6)); +} + +public MinioTestContainer(String defaultBucketName) { +super(DockerImageVersions.MINIO); + +this.accessKey = randomString("accessKey", 10); +// secrets must have at least 8 characters +this.secretKey = randomString("secret", 10); +this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName); + +withNetworkAliases(randomString("minio", 6)); +addExposedPort(DEFAULT_PORT); +withEnv(MINIO_ACCESS_KEY, this.accessKey); +withEnv(MINIO_SECRET_KEY, this.secretKey); +withCommand("server", DEFAULT_STORAGE_DIRECTORY); +setWaitStrategy( +new HttpWaitStrategy() +.forPort(DEFAULT_PORT) +.forPath(HEALTH_ENDPOINT) +.withStartupTimeout(Duration.ofMinutes(2))); +} + +@Override +protected void containerIsStarted(InspectContainerResponse containerInfo) { +super.containerIsStarted(containerInfo); +createDefaultBucket(); +} + +private static String randomString(String prefix, int length) { +return String.format("%s-%s", prefix, Base58.randomString(length).toLowerCase(Locale.ROOT)); +} + +/** Creates {@link AmazonS3} client for accessing the {@code Minio} instance. */ +public AmazonS3 getClient() { +return AmazonS3Client.builder() +.withCredentials( +new AWSStaticCredentialsProvider( +new BasicAWSCredentials(accessKey, secretKey))) +.withPathStyleAccessEnabled(true) +.withEndpointConfiguration( +new AwsClientBuilder.EndpointConfiguration( +getHttpEndpoint(), "unused-region")) +.build(); +} + +private String getHttpEndpoint() { +return String.format("http://%s:%s";, getContainerIpAddress(), getMappedPort(DEFAULT_PORT)); +} + +/** + * Initializes the Minio instance (i.e. creating the default bucket and initializing Flink's + * FileSystems). Additionally, the passed Flink {@link Configuration} is extended by all + * relevant parameter to access the {@code Minio} instance
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804166962 ## File path: flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3presto; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.github.dockerjava.api.command.InspectContainerResponse; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.Base58; + +import java.time.Duration; +import java.util.Locale; + +/** + * {@code MinioTestContainer} provides a {@code Minio} test instance. This code is based on the work + * done by {@code alexkirnsu/minio-testcontainer}. + */ +public class MinioTestContainer extends GenericContainer { + +private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint"; + +private static final int DEFAULT_PORT = 9000; + +private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER"; +private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD"; + +private static final String DEFAULT_STORAGE_DIRECTORY = "/data"; +private static final String HEALTH_ENDPOINT = "/minio/health/ready"; + +private final String accessKey; +private final String secretKey; +private final String defaultBucketName; + +public MinioTestContainer() { +this(randomString("bucket", 6)); +} + +public MinioTestContainer(String defaultBucketName) { +super(DockerImageVersions.MINIO); + +this.accessKey = randomString("accessKey", 10); +// secrets must have at least 8 characters +this.secretKey = randomString("secret", 10); +this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName); + +withNetworkAliases(randomString("minio", 6)); +addExposedPort(DEFAULT_PORT); Review comment: correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804161565 ## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/TestContainerExtension.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.testutils; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.GenericContainer; + +import javax.annotation.Nullable; + +import java.util.function.Supplier; + +/** + * {@code TestContainerExtension} provides common functionality for {@code TestContainer} + * implementations. + * + * @param The {@link GenericContainer} that shall be managed. + */ +public class TestContainerExtension> implements CustomExtension { Review comment: I looked into it: The `GenericTestContainer` derives from `FailureDetectingExternalResource` (which is provided by `org.testcontainers:testcontainers`) as well and implements `org.junit.TestRule`. `TestRule` doesn't provide any before/after methods but just an `apply` method. The lifecycle methods are not provided and therefore cannot be integrated into the junit5 Extension interface -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r804103917 ## File path: flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3presto; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.github.dockerjava.api.command.InspectContainerResponse; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.Base58; + +import java.time.Duration; +import java.util.Locale; + +/** + * {@code MinioTestContainer} provides a {@code Minio} test instance. This code is based on the work + * done by {@code alexkirnsu/minio-testcontainer}. + */ +public class MinioTestContainer extends GenericContainer { + +private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint"; + +private static final int DEFAULT_PORT = 9000; + +private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER"; +private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD"; + +private static final String DEFAULT_STORAGE_DIRECTORY = "/data"; +private static final String HEALTH_ENDPOINT = "/minio/health/ready"; + +private final String accessKey; +private final String secretKey; +private final String defaultBucketName; + +public MinioTestContainer() { +this(randomString("bucket", 6)); +} + +public MinioTestContainer(String defaultBucketName) { +super(DockerImageVersions.MINIO); + +this.accessKey = randomString("accessKey", 10); +// secrets must have at least 8 characters +this.secretKey = randomString("secret", 10); +this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName); + +withNetworkAliases(randomString("minio", 6)); +addExposedPort(DEFAULT_PORT); +withEnv(MINIO_ACCESS_KEY, this.accessKey); +withEnv(MINIO_SECRET_KEY, this.secretKey); +withCommand("server", DEFAULT_STORAGE_DIRECTORY); +setWaitStrategy( +new HttpWaitStrategy() +.forPort(DEFAULT_PORT) +.forPath(HEALTH_ENDPOINT) +.withStartupTimeout(Duration.ofMinutes(2))); +} + +@Override +protected void containerIsStarted(InspectContainerResponse containerInfo) { +super.containerIsStarted(containerInfo); +createDefaultBucket(); +} + +private static String randomString(String prefix, int length) { +return String.format("%s-%s", prefix, Base58.randomString(length).toLowerCase(Locale.ROOT)); +} + +/** Creates {@link AmazonS3} client for accessing the {@code Minio} instance. */ +public AmazonS3 getClient() { +return AmazonS3Client.builder() +.withCredentials( +new AWSStaticCredentialsProvider( +new BasicAWSCredentials(accessKey, secretKey))) +.withPathStyleAccessEnabled(true) +.withEndpointConfiguration( +new AwsClientBuilder.EndpointConfiguration( +getHttpEndpoint(), "unused-region")) +.build(); +} + +private String getHttpEndpoint() { +return String.format("http://%s:%s";, getContainerIpAddress(), getMappedPort(DEFAULT_PORT)); +} + +/** + * Initializes the Minio instance (i.e. creating the default bucket and initializing Flink's + * FileSystems). Additionally, the passed Flink {@link Configuration} is extended by all + * relevant parameter to access the {@code Minio} instance
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r803608001 ## File path: flink-filesystems/flink-s3-fs-presto/pom.xml ## @@ -46,6 +46,36 @@ under the License. provided + + + org.apache.flink + flink-runtime + ${project.version} + test + + + + org.apache.flink + flink-runtime + ${project.version} + test-jar + tests Review comment: It took me a bit to realize that you didn't mean the entire dependency but the `` :-D Fair enough, I guess, that ended up in the code while trying to integrate the test jars. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r803484034 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides {@code abstract} + * methods for initializing a specific {@link FileSystem}. + */ +@ExtendWith(TestLoggerExtension.class) +public abstract class AbstractHaJobRunITCase { + +@RegisterExtension +private static final AllCallbackWrapper ZOOKEEPER_EXTENSION = +new AllCallbackWrapper<>(new ZooKeeperExtension()); + +@RegisterExtension +public final EachCallbackWrapper miniClusterExtension = +new EachCallbackWrapper<>( +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(1) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) +.setConfiguration(getFlinkConfiguration()) +.build())); + +private Configuration getFlinkConfiguration() { +Configuration config = new Configuration(); +config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); +config.set( +HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, +ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString()); + +config.set(HighAvailabilityOptions.HA_STORAGE_PATH, createHAStoragePath()); + +updateConfiguration(config); + +FileSystem.initialize(config, loadPluginManager()); +initializeStorageBackend(config); + +return config; +} + +/** + * Should return the path to the HA storage which will be injected into the Flink configuration. + * + * @see HighAvailabilityOptions#HA_STORAGE_PATH + */ +protected abstract String createHAStoragePath(); + +/** + * Updates the passed {@link Configuration} to point to the {@link FileSystem} that's subject to + * test. + */ +protected abstract void updateConfiguration(Configuration config); + +/** Runs any additional initialization that are necessary before running the actual test. */ +protected void initializeStorageBackend(Configuration config) {} + +@Nullable +protected PluginManager loadPluginManager() { +// by default, no PluginManager is loaded +return null; +} + +@Test +public void testJobExecutionInHaMode() throws E
[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug
XComp commented on a change in pull request #18692: URL: https://github.com/apache/flink/pull/18692#discussion_r803481934 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.EachCallbackWrapper; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.zookeeper.ZooKeeperExtension; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides {@code abstract} + * methods for initializing a specific {@link FileSystem}. + */ +@ExtendWith(TestLoggerExtension.class) +public abstract class AbstractHaJobRunITCase { + +@RegisterExtension +private static final AllCallbackWrapper ZOOKEEPER_EXTENSION = +new AllCallbackWrapper<>(new ZooKeeperExtension()); + +@RegisterExtension +public final EachCallbackWrapper miniClusterExtension = +new EachCallbackWrapper<>( +new MiniClusterExtension( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(1) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) +.setConfiguration(getFlinkConfiguration()) +.build())); + +private Configuration getFlinkConfiguration() { +Configuration config = new Configuration(); +config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); +config.set( +HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, +ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString()); + +config.set(HighAvailabilityOptions.HA_STORAGE_PATH, createHAStoragePath()); + +updateConfiguration(config); + +FileSystem.initialize(config, loadPluginManager()); +initializeStorageBackend(config); + +return config; +} + +/** + * Should return the path to the HA storage which will be injected into the Flink configuration. + * + * @see HighAvailabilityOptions#HA_STORAGE_PATH + */ +protected abstract String createHAStoragePath(); + +/** + * Updates the passed {@link Configuration} to point to the {@link FileSystem} that's subject to + * test. + */ +protected abstract void updateConfiguration(Configuration config); + +/** Runs any additional initialization that are necessary before running the actual test. */ +protected void initializeStorageBackend(Configuration config) {} + +@Nullable +protected PluginManager loadPluginManager() { +// by default, no PluginManager is loaded +return null; +} + +@Test +public void testJobExecutionInHaMode() throws E