[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-15 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r806086781



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
##
@@ -52,7 +52,18 @@
  */
 public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore 
{
 
-private static final String DIRTY_SUFFIX = "_DIRTY";
+@VisibleForTesting static final String FILE_EXTENSION = ".json";
+@VisibleForTesting static final String DIRTY_FILE_EXTENSION = "_DIRTY" + 
FILE_EXTENSION;
+
+@VisibleForTesting
+public static boolean hasValidDirtyJobResultStoreEntryExtension(String 
filename) {
+return filename.endsWith(DIRTY_FILE_EXTENSION);
+}
+
+@VisibleForTesting
+public static boolean hasValidJobResultStoreEntryExtension(String 
filename) {
+return filename.endsWith(FILE_EXTENSION);

Review comment:
   The constants I use in the `FileSystemJobResultStoreTest` as well. But I 
kept them `package-private`. The `hasValid*` can be used in other places like 
the `HAJobRunOnMinioS3StoreITCase` and are, therefore, `public`

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides 
{@code abstract}
+ * methods for initializing a specific {@link FileSystem}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public abstract class AbstractHaJobRunITCase {
+
+@RegisterExtension
+private static final AllCallbackWrapper 
ZOOKEEPER_EXTENSION =
+new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+@RegisterExtension
+public final EachCallbackWrapper 
miniClusterExtension =
+new EachCallbackWrapper<>(
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(1)
+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+.setConfiguration(getFlinkConfiguration())
+.build()));
+
+private Configuration getFlinkConfiguration() {
+Configuration config = new Configuration();
+config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+config.set(
+HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
+
+config.set(HighAvailabilityOptions.HA_S

[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-15 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r806857666



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
##
@@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID 
jobId) throws IOException {
 @Override
 public Set getDirtyResultsInternal() throws IOException {
 final Set dirtyResults = new HashSet<>();
-final FileStatus fs = fileSystem.getFileStatus(this.basePath);
-if (fs.isDir()) {
-FileStatus[] statuses = fileSystem.listStatus(this.basePath);

Review comment:
   To clarify: I looked through the code once more. The 
`PrestoS3FileSystem` does not support `getFileStatus` on empty directories. 
You're right when you said that the `mkdir` call doesn't create anything (see 
[PrestoS3FileSystem:520](https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java#L520)).
 But the `getFileStatus` method tries to get the `FileStatus` of the object at 
the given path. If that does not exist, it will look for objects having the 
path as a prefix (through `listObject`). A `FileNotFoundException` is thrown if 
no objects live underneath the passed path (which corresponds to an empty 
directory, see 
[PrestoS3FileSystem:361](https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java#L361)).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-15 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804187457



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
##
@@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID 
jobId) throws IOException {
 @Override
 public Set getDirtyResultsInternal() throws IOException {
 final Set dirtyResults = new HashSet<>();
-final FileStatus fs = fileSystem.getFileStatus(this.basePath);
-if (fs.isDir()) {
-FileStatus[] statuses = fileSystem.listStatus(this.basePath);

Review comment:
   The presto test still fails with this change being reverted. 
`PrestoS3FileSystem` does not support `getFileStatus` on empty directories. 
Removing the `isDir` check fixes that issue. I do another round over the presto 
implementation tomorrow to understand why it is. FLINK-26061 covers the 
difference between presto and hadoop s3 fs
   
   
   *updated the comment to make more sense out of it*




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-14 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r806121339



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides 
{@code abstract}
+ * methods for initializing a specific {@link FileSystem}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public abstract class AbstractHaJobRunITCase {
+
+@RegisterExtension
+private static final AllCallbackWrapper 
ZOOKEEPER_EXTENSION =
+new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+@RegisterExtension
+public final EachCallbackWrapper 
miniClusterExtension =
+new EachCallbackWrapper<>(
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(1)
+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+.setConfiguration(getFlinkConfiguration())
+.build()));
+
+private Configuration getFlinkConfiguration() {
+Configuration config = new Configuration();
+config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+config.set(
+HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
+
+config.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
createHAStoragePath());
+
+updateConfiguration(config);
+
+FileSystem.initialize(config, loadPluginManager());
+initializeStorageBackend(config);
+
+return config;
+}
+
+/**
+ * Should return the path to the HA storage which will be injected into 
the Flink configuration.
+ *
+ * @see HighAvailabilityOptions#HA_STORAGE_PATH
+ */
+protected abstract String createHAStoragePath();
+
+/**
+ * Updates the passed {@link Configuration} to point to the {@link 
FileSystem} that's subject to
+ * test.
+ */
+protected abstract void updateConfiguration(Configuration config);
+
+/** Runs any additional initialization that are necessary before running 
the actual test. */
+protected void initializeStorageBackend(Configuration config) {}
+
+@Nullable
+protected PluginManager loadPluginManager() {
+// by default, no PluginManager is loaded
+return null;
+}
+
+@Test
+public void testJobExecutionInHaMode() throws E

[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-14 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r806086781



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
##
@@ -52,7 +52,18 @@
  */
 public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore 
{
 
-private static final String DIRTY_SUFFIX = "_DIRTY";
+@VisibleForTesting static final String FILE_EXTENSION = ".json";
+@VisibleForTesting static final String DIRTY_FILE_EXTENSION = "_DIRTY" + 
FILE_EXTENSION;
+
+@VisibleForTesting
+public static boolean hasValidDirtyJobResultStoreEntryExtension(String 
filename) {
+return filename.endsWith(DIRTY_FILE_EXTENSION);
+}
+
+@VisibleForTesting
+public static boolean hasValidJobResultStoreEntryExtension(String 
filename) {
+return filename.endsWith(FILE_EXTENSION);

Review comment:
   The constants I use in the `FileSystemJobResultStoreTest` as well. But I 
kept them `package-private`. The `hasValid*` can be used in other places like 
the `HAJobRunOnMinioS3StoreITCase` and are, therefore, `public`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-14 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r805876545



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides 
{@code abstract}
+ * methods for initializing a specific {@link FileSystem}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public abstract class AbstractHaJobRunITCase {
+
+@RegisterExtension
+private static final AllCallbackWrapper 
ZOOKEEPER_EXTENSION =
+new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+@RegisterExtension
+public final EachCallbackWrapper 
miniClusterExtension =
+new EachCallbackWrapper<>(
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(1)
+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+.setConfiguration(getFlinkConfiguration())
+.build()));
+
+private Configuration getFlinkConfiguration() {
+Configuration config = new Configuration();
+config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+config.set(
+HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
+
+config.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
createHAStoragePath());
+
+updateConfiguration(config);
+
+FileSystem.initialize(config, loadPluginManager());
+initializeStorageBackend(config);
+
+return config;
+}
+
+/**
+ * Should return the path to the HA storage which will be injected into 
the Flink configuration.
+ *
+ * @see HighAvailabilityOptions#HA_STORAGE_PATH
+ */
+protected abstract String createHAStoragePath();
+
+/**
+ * Updates the passed {@link Configuration} to point to the {@link 
FileSystem} that's subject to
+ * test.
+ */
+protected abstract void updateConfiguration(Configuration config);
+
+/** Runs any additional initialization that are necessary before running 
the actual test. */
+protected void initializeStorageBackend(Configuration config) {}
+
+@Nullable
+protected PluginManager loadPluginManager() {
+// by default, no PluginManager is loaded
+return null;
+}
+
+@Test
+public void testJobExecutionInHaMode() throws E

[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-14 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r805874471



##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
##
@@ -45,4 +45,6 @@
 public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
 
 public static final String CASSANDRA_3 = "cassandra:3.0";
+
+public static final String MINIO = "minio/minio:edge";

Review comment:
   Debugging the behavior from the Flink side brings me to the conclusion 
that it's caused by the response coming from a `listObjects` call. It looks 
like [Minio PR #13804](https://github.com/minio/minio/pull/13804) is the fix 
that was added in `RELEASE.2021-12-09T06-19-41Z` that's relevant for us here. 
The PR description also points to `S3AFileSystem` "utilizing the behavior that 
is fixed in that PR". I leave it like that without verifying it through a 
custom build to not put more time into that investigation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-14 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r805792824



##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
##
@@ -45,4 +45,6 @@
 public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
 
 public static final String CASSANDRA_3 = "cassandra:3.0";
+
+public static final String MINIO = "minio/minio:edge";

Review comment:
   Yes, it looks like it the hadoop S3 FS issue on the Flink side was 
caused by Minio... I'm trying to understand a bit better still digging through 
the diff on the minio side.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-14 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r805792824



##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
##
@@ -45,4 +45,6 @@
 public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
 
 public static final String CASSANDRA_3 = "cassandra:3.0";
+
+public static final String MINIO = "minio/minio:edge";

Review comment:
   Yes, it looks like it the hadoop issue was caused by Minio... I'm trying 
to understand a bit better still digging through the diff on the minio side.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-14 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r805781523



##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
##
@@ -45,4 +45,6 @@
 public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
 
 public static final String CASSANDRA_3 = "cassandra:3.0";
+
+public static final String MINIO = "minio/minio:edge";

Review comment:
   I looked into the versioning again. `edge` seems to be a development 
version:
   ```
   $ docker run --entrypoint="" \
 -p 9000:9000 \
 -p 9001:9001 \
 -v /tmp/minio/data2:/data \
 -e "MINIO_ROOT_USER=minio" \
 -e "MINIO_ROOT_PASSWORD=minio123" \
 -it minio/minio:edge /opt/bin/minio --version
   minio version DEVELOPMENT.2021-11-23T00-07-23Z
   ```
   It includes a bug that causes the `getFileStatus` to fail on an empty 
directory to fail. The observed behavior is also present in the released 
version `RELEASE.2021-11-24T23-19-33Z` but fixed in the subsequent version 
`RELEASE.2021-12-09T06-19-41Z`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-14 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r805781523



##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
##
@@ -45,4 +45,6 @@
 public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
 
 public static final String CASSANDRA_3 = "cassandra:3.0";
+
+public static final String MINIO = "minio/minio:edge";

Review comment:
   I looked into the versioning again. `edge` seems to be a development 
version:
   ```
   $ docker run --entrypoint="" \
 -p 9000:9000 \
 -p 9001:9001 \
 -v /tmp/minio/data2:/data \
 -e "MINIO_ROOT_USER=minio" \
 -e "MINIO_ROOT_PASSWORD=minio123" \
 -it minio/minio:edge /opt/bin/minio --version
   minio version DEVELOPMENT.2021-11-23T00-07-23Z
   ```
   It includes a bug that causes the `getFileStatus` to fail on an empty 
directory to fail. The observed behavior is also present in the released 
version `RELEASE.2021-11-24T23-19-33Z` but fixed in the subsequent version 
`RELEASE.2021-12-10T23-03-39Z`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-14 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r805781523



##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
##
@@ -45,4 +45,6 @@
 public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
 
 public static final String CASSANDRA_3 = "cassandra:3.0";
+
+public static final String MINIO = "minio/minio:edge";

Review comment:
   I looked into the versioning again. `edge` seems to be a development 
version:
   ```
   docker run --entrypoint="" \
 -p 9000:9000 \
 -p 9001:9001 \
 -v /tmp/minio/data2:/data \
 -e "MINIO_ROOT_USER=minio" \
 -e "MINIO_ROOT_PASSWORD=minio123" \
 -it minio/minio:edge /opt/bin/minio --version
   minio version DEVELOPMENT.2021-11-23T00-07-23Z
   ```
   It includes a bug that causes the `getFileStatus` to fail on an empty 
directory to fail. The observed behavior is also present in the released 
version `RELEASE.2021-11-24T23-19-33Z` but fixed in the subsequent version 
`RELEASE.2021-12-10T23-03-39Z`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-14 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r805781523



##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
##
@@ -45,4 +45,6 @@
 public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
 
 public static final String CASSANDRA_3 = "cassandra:3.0";
+
+public static final String MINIO = "minio/minio:edge";

Review comment:
   I looked into the versioning again. `edge` seems to be a development 
version:
   ```
   docker run --entrypoint="" \
 -p 9000:9000 \
 -p 9001:9001 \
 -v /tmp/minio/data2:/data \
 -e "MINIO_ROOT_USER=minio" \
 -e "MINIO_ROOT_PASSWORD=minio123" \
 -it minio/minio:edge /opt/bin/minio --version
   minio version DEVELOPMENT.2021-11-23T00-07-23Z
   ```
   It includes a bug that causes the `getFileStatus` to fail on an empty 
directory to fail. The observed behavior is also present in the released 
version `RELEASE.2021-11-24T23-19-33Z` but fixed in the succeeding version 
`RELEASE.2021-12-10T23-03-39Z`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-11 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804570972



##
File path: flink-filesystems/flink-s3-fs-base/pom.xml
##
@@ -235,6 +265,27 @@ under the License.
true


+
+   
+   org.apache.maven.plugins
+   maven-jar-plugin
+   true
+   
+   
+   
+   
true
+   
true
+   
+   
+   

Review comment:
   Accidentally added through blind copy&paste from `flink-runtime`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-11 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804518375



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides 
{@code abstract}
+ * methods for initializing a specific {@link FileSystem}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public abstract class AbstractHaJobRunITCase {
+
+@RegisterExtension
+private static final AllCallbackWrapper 
ZOOKEEPER_EXTENSION =
+new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+@RegisterExtension
+public final EachCallbackWrapper 
miniClusterExtension =
+new EachCallbackWrapper<>(
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(1)
+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)

Review comment:
   You're right. I did a quick test. I removed everything except for the 
Flink configuration. Initially, I added those lines because I wanted to check 
whether there are other ways to verify that the MiniCluster is running. 
...related to [my comment 
above](https://github.com/apache/flink/pull/18692#discussion_r803481934) about 
errors happening during the initialization of the cluster.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-11 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804498412



##
File path: flink-filesystems/flink-s3-fs-base/pom.xml
##
@@ -221,6 +221,36 @@ under the License.



+
+   
+   

Review comment:
   I tried to introduce `` in `flink-s3-fs-base` 
instead. But these dependencies haven't been picked up by the submodules 
`flink-s3-fs-presto` and `flink-s3-fs-hadoop` (Maven was still complaing about 
the missing version). I reverted that change... 
   @zentol should this generally work and I just did something and should look 
into it once more? Or is it just not possible in this use case to utilize 
``? 🤔 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-11 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804498412



##
File path: flink-filesystems/flink-s3-fs-base/pom.xml
##
@@ -221,6 +221,36 @@ under the License.



+
+   
+   

Review comment:
   I tried to introduce `` in `flink-s3-fs-base` 
instead. But they haven't been picked up by the submodules `flink-s3-fs-presto` 
and `flink-s3-fs-hadoop`. @zentol should this generally work and I just did 
something and should look into it once more? Or is it just not possible in this 
use case to utilize ``? 🤔 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-11 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804187457



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
##
@@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID 
jobId) throws IOException {
 @Override
 public Set getDirtyResultsInternal() throws IOException {
 final Set dirtyResults = new HashSet<>();
-final FileStatus fs = fileSystem.getFileStatus(this.basePath);
-if (fs.isDir()) {
-FileStatus[] statuses = fileSystem.listStatus(this.basePath);

Review comment:
   The presto test still fails with this change being reverted. 
`PrestoS3FileSystem` does not call `getFileStatus`. Removing the `isDir` check 
fixes that issue. I do another round over the presto implementation tomorrow to 
understand why it is. FLINK-26061 covers the difference between presto and 
hadoop s3 fs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804199654



##
File path: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
##
@@ -588,8 +588,10 @@ public static URI getDefaultFsUri() {
  *
  * @param f The path we want information from
  * @return a FileStatus object
- * @throws FileNotFoundException when the path does not exist; IOException 
see specific
- * implementation
+ * @throws FileNotFoundException when the path does not exist. This also 
applies to cases where
+ * the {@code FileSystem} implementation is based on an object store 
like S3 and the passed
+ * {@code Path} refers to a directory (even if it was created through 
{@link #mkdirs(Path)};

Review comment:
   You're right. I reverted the JavaDoc changes entirely because I feel 
that it adds more confusion than it helps. FLINK-26061 is still valid, though.

##
File path: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
##
@@ -684,7 +686,9 @@ public boolean exists(final Path f) throws IOException {
 
 /**
  * Make the given file and all non-existent parents into directories. Has 
the semantics of Unix
- * 'mkdir -p'. Existence of the directory hierarchy is not an error.
+ * 'mkdir -p'. Existence of the directory hierarchy is not an error. 
{@code FileSystem}
+ * implementations being backed by an object store like S3 do not support 
directories. No
+ * entities are going to be created in that case.

Review comment:
   You're right. I reverted the JavaDoc changes entirely because I feel 
that it adds more confusion than it helps. FLINK-26061 is still valid, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804187457



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
##
@@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID 
jobId) throws IOException {
 @Override
 public Set getDirtyResultsInternal() throws IOException {
 final Set dirtyResults = new HashSet<>();
-final FileStatus fs = fileSystem.getFileStatus(this.basePath);
-if (fs.isDir()) {
-FileStatus[] statuses = fileSystem.listStatus(this.basePath);

Review comment:
   The presto test still fails with this change being reverted. 
`PrestoS3FileSystem` does not call `getFileStatus`. Removing the `isDir` check 
fixes that issue. I do another round over the presto implementation tomorrow to 
understand why it is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804187457



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
##
@@ -155,18 +155,15 @@ public boolean hasCleanJobResultEntryInternal(JobID 
jobId) throws IOException {
 @Override
 public Set getDirtyResultsInternal() throws IOException {
 final Set dirtyResults = new HashSet<>();
-final FileStatus fs = fileSystem.getFileStatus(this.basePath);
-if (fs.isDir()) {
-FileStatus[] statuses = fileSystem.listStatus(this.basePath);

Review comment:
   The presto test still fails with this change being reverted. 
`PrestoS3FileSystem` does not call `getFileStatus`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804182029



##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/DockerImageVersions.java
##
@@ -45,4 +45,6 @@
 public static final String PULSAR = "apachepulsar/pulsar:2.8.0";
 
 public static final String CASSANDRA_3 = "cassandra:3.0";
+
+public static final String MINIO = "minio/minio:edge";

Review comment:
   `minio/minio:edge` seems to be an old image version which caused weird 
errors with the hadoop s3 filesystem. I updated the tag to the most recent 
release which resolved the issue (@rmetzger used this old image in his test as 
well). That explains why we observed the same error with the s3 hadoop file 
system.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804179821



##
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+
+import java.time.Duration;
+import java.util.Locale;
+
+/**
+ * {@code MinioTestContainer} provides a {@code Minio} test instance. This 
code is based on the work
+ * done by {@code alexkirnsu/minio-testcontainer}.
+ */
+public class MinioTestContainer extends GenericContainer {
+
+private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+
+private static final int DEFAULT_PORT = 9000;
+
+private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER";
+private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD";
+
+private static final String DEFAULT_STORAGE_DIRECTORY = "/data";
+private static final String HEALTH_ENDPOINT = "/minio/health/ready";
+
+private final String accessKey;
+private final String secretKey;
+private final String defaultBucketName;
+
+public MinioTestContainer() {
+this(randomString("bucket", 6));
+}
+
+public MinioTestContainer(String defaultBucketName) {
+super(DockerImageVersions.MINIO);
+
+this.accessKey = randomString("accessKey", 10);
+// secrets must have at least 8 characters
+this.secretKey = randomString("secret", 10);
+this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName);
+
+withNetworkAliases(randomString("minio", 6));
+addExposedPort(DEFAULT_PORT);
+withEnv(MINIO_ACCESS_KEY, this.accessKey);
+withEnv(MINIO_SECRET_KEY, this.secretKey);
+withCommand("server", DEFAULT_STORAGE_DIRECTORY);
+setWaitStrategy(
+new HttpWaitStrategy()
+.forPort(DEFAULT_PORT)
+.forPath(HEALTH_ENDPOINT)
+.withStartupTimeout(Duration.ofMinutes(2)));
+}
+
+@Override
+protected void containerIsStarted(InspectContainerResponse containerInfo) {
+super.containerIsStarted(containerInfo);
+createDefaultBucket();
+}
+
+private static String randomString(String prefix, int length) {
+return String.format("%s-%s", prefix, 
Base58.randomString(length).toLowerCase(Locale.ROOT));
+}
+
+/** Creates {@link AmazonS3} client for accessing the {@code Minio} 
instance. */
+public AmazonS3 getClient() {
+return AmazonS3Client.builder()
+.withCredentials(
+new AWSStaticCredentialsProvider(
+new BasicAWSCredentials(accessKey, secretKey)))
+.withPathStyleAccessEnabled(true)
+.withEndpointConfiguration(
+new AwsClientBuilder.EndpointConfiguration(
+getHttpEndpoint(), "unused-region"))
+.build();
+}
+
+private String getHttpEndpoint() {
+return String.format("http://%s:%s";, getContainerIpAddress(), 
getMappedPort(DEFAULT_PORT));
+}
+
+/**
+ * Initializes the Minio instance (i.e. creating the default bucket and 
initializing Flink's
+ * FileSystems). Additionally, the passed Flink {@link Configuration} is 
extended by all
+ * relevant parameter to access the {@code Minio} instance

[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804177584



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides 
{@code abstract}
+ * methods for initializing a specific {@link FileSystem}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public abstract class AbstractHaJobRunITCase {
+
+@RegisterExtension
+private static final AllCallbackWrapper 
ZOOKEEPER_EXTENSION =
+new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+@RegisterExtension
+public final EachCallbackWrapper 
miniClusterExtension =
+new EachCallbackWrapper<>(
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(1)
+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+.setConfiguration(getFlinkConfiguration())
+.build()));
+
+private Configuration getFlinkConfiguration() {
+Configuration config = new Configuration();
+config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+config.set(
+HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
+
+config.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
createHAStoragePath());
+
+updateConfiguration(config);
+
+FileSystem.initialize(config, loadPluginManager());
+initializeStorageBackend(config);
+
+return config;
+}
+
+/**
+ * Should return the path to the HA storage which will be injected into 
the Flink configuration.
+ *
+ * @see HighAvailabilityOptions#HA_STORAGE_PATH
+ */
+protected abstract String createHAStoragePath();
+
+/**
+ * Updates the passed {@link Configuration} to point to the {@link 
FileSystem} that's subject to
+ * test.
+ */
+protected abstract void updateConfiguration(Configuration config);
+
+/** Runs any additional initialization that are necessary before running 
the actual test. */
+protected void initializeStorageBackend(Configuration config) {}

Review comment:
   Not really. I removed it. Additionally, I replaced the 
`updateConfiguration` by a `createConfliguration` since it's a cleaner 
interface method.




-- 
This is an automated message from t

[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804176021



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides 
{@code abstract}
+ * methods for initializing a specific {@link FileSystem}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public abstract class AbstractHaJobRunITCase {
+
+@RegisterExtension
+private static final AllCallbackWrapper 
ZOOKEEPER_EXTENSION =
+new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+@RegisterExtension
+public final EachCallbackWrapper 
miniClusterExtension =
+new EachCallbackWrapper<>(
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(1)
+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+.setConfiguration(getFlinkConfiguration())
+.build()));
+
+private Configuration getFlinkConfiguration() {
+Configuration config = new Configuration();
+config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+config.set(
+HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
+
+config.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
createHAStoragePath());
+
+updateConfiguration(config);
+
+FileSystem.initialize(config, loadPluginManager());
+initializeStorageBackend(config);
+
+return config;
+}
+
+/**
+ * Should return the path to the HA storage which will be injected into 
the Flink configuration.
+ *
+ * @see HighAvailabilityOptions#HA_STORAGE_PATH
+ */
+protected abstract String createHAStoragePath();
+
+/**
+ * Updates the passed {@link Configuration} to point to the {@link 
FileSystem} that's subject to
+ * test.
+ */
+protected abstract void updateConfiguration(Configuration config);
+
+/** Runs any additional initialization that are necessary before running 
the actual test. */
+protected void initializeStorageBackend(Configuration config) {}
+
+@Nullable
+protected PluginManager loadPluginManager() {

Review comment:
   there's not really a benefit. is removed again 👍 




-- 
This is an automated message from the Apache Git Service.
To re

[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804175066



##
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainerTest.java
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.core.testutils.TestContainerExtension;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.Bucket;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * {@code MinioTestContainerTest} tests some basic functionality provided by 
{@link
+ * MinioTestContainer}.
+ */
+public class MinioTestContainerTest {
+
+private static final String DEFAULT_BUCKET_NAME = "test-bucket";
+
+@RegisterExtension
+private static final 
EachCallbackWrapper>
+MINIO_EXTENSION =
+new EachCallbackWrapper<>(
+new TestContainerExtension<>(
+() -> new 
MinioTestContainer(DEFAULT_BUCKET_NAME)));
+
+private static MinioTestContainer getTestContainer() {
+return MINIO_EXTENSION.getCustomExtension().getTestContainer();
+}
+
+private static AmazonS3 getClient() {
+return getTestContainer().getClient();
+}
+
+@Test
+public void testBucketCreation() {
+final String otherBucketName = "other-bucket";
+final Bucket otherBucket = getClient().createBucket(otherBucketName);
+
+assertThat(otherBucket).isNotNull();
+
assertThat(otherBucket).extracting(Bucket::getName).isEqualTo(otherBucketName);
+
+assertThat(getClient().listBuckets())
+.map(Bucket::getName)
+.containsExactlyInAnyOrder(
+getTestContainer().getDefaultBucketName(), 
otherBucketName);
+}
+
+@Test
+public void testPutObject() throws IOException {
+final String bucketName = "other-bucket";
+
+getClient().createBucket(bucketName);
+final String objectId = "test-object";
+final String content = "test content";
+getClient().putObject(bucketName, objectId, content);
+
+final BufferedReader reader =
+new BufferedReader(
+new InputStreamReader(
+getClient().getObject(bucketName, 
objectId).getObjectContent()));
+assertThat(reader.readLine()).isEqualTo(content);
+}
+
+@Test
+public void testFlinkConfigurationInitialization() {
+final Configuration config = new Configuration();
+getTestContainer().updateConfigAccordingly(config);
+
+assertThat(config.containsKey("s3.endpoint")).isTrue();
+assertThat(config.containsKey("s3.path.style.access")).isTrue();
+assertThat(config.containsKey("s3.access.key")).isTrue();
+assertThat(config.containsKey("s3.secret.key")).isTrue();
+}
+
+@Test
+public void testDefaultBucketCreation() {
+final Configuration config = new Configuration();
+getTestContainer().updateConfigAccordingly(config);

Review comment:
   not required. correct




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804168911



##
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+
+import java.time.Duration;
+import java.util.Locale;
+
+/**
+ * {@code MinioTestContainer} provides a {@code Minio} test instance. This 
code is based on the work
+ * done by {@code alexkirnsu/minio-testcontainer}.
+ */
+public class MinioTestContainer extends GenericContainer {
+
+private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+
+private static final int DEFAULT_PORT = 9000;
+
+private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER";
+private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD";
+
+private static final String DEFAULT_STORAGE_DIRECTORY = "/data";
+private static final String HEALTH_ENDPOINT = "/minio/health/ready";
+
+private final String accessKey;
+private final String secretKey;
+private final String defaultBucketName;
+
+public MinioTestContainer() {
+this(randomString("bucket", 6));
+}
+
+public MinioTestContainer(String defaultBucketName) {
+super(DockerImageVersions.MINIO);
+
+this.accessKey = randomString("accessKey", 10);
+// secrets must have at least 8 characters
+this.secretKey = randomString("secret", 10);
+this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName);
+
+withNetworkAliases(randomString("minio", 6));
+addExposedPort(DEFAULT_PORT);
+withEnv(MINIO_ACCESS_KEY, this.accessKey);
+withEnv(MINIO_SECRET_KEY, this.secretKey);
+withCommand("server", DEFAULT_STORAGE_DIRECTORY);
+setWaitStrategy(
+new HttpWaitStrategy()
+.forPort(DEFAULT_PORT)
+.forPath(HEALTH_ENDPOINT)
+.withStartupTimeout(Duration.ofMinutes(2)));
+}
+
+@Override
+protected void containerIsStarted(InspectContainerResponse containerInfo) {
+super.containerIsStarted(containerInfo);
+createDefaultBucket();
+}
+
+private static String randomString(String prefix, int length) {
+return String.format("%s-%s", prefix, 
Base58.randomString(length).toLowerCase(Locale.ROOT));
+}
+
+/** Creates {@link AmazonS3} client for accessing the {@code Minio} 
instance. */
+public AmazonS3 getClient() {
+return AmazonS3Client.builder()
+.withCredentials(
+new AWSStaticCredentialsProvider(
+new BasicAWSCredentials(accessKey, secretKey)))
+.withPathStyleAccessEnabled(true)
+.withEndpointConfiguration(
+new AwsClientBuilder.EndpointConfiguration(
+getHttpEndpoint(), "unused-region"))
+.build();
+}
+
+private String getHttpEndpoint() {
+return String.format("http://%s:%s";, getContainerIpAddress(), 
getMappedPort(DEFAULT_PORT));
+}
+
+/**
+ * Initializes the Minio instance (i.e. creating the default bucket and 
initializing Flink's
+ * FileSystems). Additionally, the passed Flink {@link Configuration} is 
extended by all
+ * relevant parameter to access the {@code Minio} instance

[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804166962



##
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+
+import java.time.Duration;
+import java.util.Locale;
+
+/**
+ * {@code MinioTestContainer} provides a {@code Minio} test instance. This 
code is based on the work
+ * done by {@code alexkirnsu/minio-testcontainer}.
+ */
+public class MinioTestContainer extends GenericContainer {
+
+private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+
+private static final int DEFAULT_PORT = 9000;
+
+private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER";
+private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD";
+
+private static final String DEFAULT_STORAGE_DIRECTORY = "/data";
+private static final String HEALTH_ENDPOINT = "/minio/health/ready";
+
+private final String accessKey;
+private final String secretKey;
+private final String defaultBucketName;
+
+public MinioTestContainer() {
+this(randomString("bucket", 6));
+}
+
+public MinioTestContainer(String defaultBucketName) {
+super(DockerImageVersions.MINIO);
+
+this.accessKey = randomString("accessKey", 10);
+// secrets must have at least 8 characters
+this.secretKey = randomString("secret", 10);
+this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName);
+
+withNetworkAliases(randomString("minio", 6));
+addExposedPort(DEFAULT_PORT);

Review comment:
   correct




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804161565



##
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/TestContainerExtension.java
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.testcontainers.containers.GenericContainer;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Supplier;
+
+/**
+ * {@code TestContainerExtension} provides common functionality for {@code 
TestContainer}
+ * implementations.
+ *
+ * @param  The {@link GenericContainer} that shall be managed.
+ */
+public class TestContainerExtension> implements 
CustomExtension {

Review comment:
   I looked into it: The `GenericTestContainer` derives from 
`FailureDetectingExternalResource` (which is provided by 
`org.testcontainers:testcontainers`) as well and implements 
`org.junit.TestRule`. `TestRule` doesn't provide any before/after methods but 
just an `apply` method. The lifecycle methods are not provided and therefore 
cannot be integrated into the junit5 Extension interface




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r804103917



##
File path: 
flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/MinioTestContainer.java
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+
+import java.time.Duration;
+import java.util.Locale;
+
+/**
+ * {@code MinioTestContainer} provides a {@code Minio} test instance. This 
code is based on the work
+ * done by {@code alexkirnsu/minio-testcontainer}.
+ */
+public class MinioTestContainer extends GenericContainer {
+
+private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+
+private static final int DEFAULT_PORT = 9000;
+
+private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER";
+private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD";
+
+private static final String DEFAULT_STORAGE_DIRECTORY = "/data";
+private static final String HEALTH_ENDPOINT = "/minio/health/ready";
+
+private final String accessKey;
+private final String secretKey;
+private final String defaultBucketName;
+
+public MinioTestContainer() {
+this(randomString("bucket", 6));
+}
+
+public MinioTestContainer(String defaultBucketName) {
+super(DockerImageVersions.MINIO);
+
+this.accessKey = randomString("accessKey", 10);
+// secrets must have at least 8 characters
+this.secretKey = randomString("secret", 10);
+this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName);
+
+withNetworkAliases(randomString("minio", 6));
+addExposedPort(DEFAULT_PORT);
+withEnv(MINIO_ACCESS_KEY, this.accessKey);
+withEnv(MINIO_SECRET_KEY, this.secretKey);
+withCommand("server", DEFAULT_STORAGE_DIRECTORY);
+setWaitStrategy(
+new HttpWaitStrategy()
+.forPort(DEFAULT_PORT)
+.forPath(HEALTH_ENDPOINT)
+.withStartupTimeout(Duration.ofMinutes(2)));
+}
+
+@Override
+protected void containerIsStarted(InspectContainerResponse containerInfo) {
+super.containerIsStarted(containerInfo);
+createDefaultBucket();
+}
+
+private static String randomString(String prefix, int length) {
+return String.format("%s-%s", prefix, 
Base58.randomString(length).toLowerCase(Locale.ROOT));
+}
+
+/** Creates {@link AmazonS3} client for accessing the {@code Minio} 
instance. */
+public AmazonS3 getClient() {
+return AmazonS3Client.builder()
+.withCredentials(
+new AWSStaticCredentialsProvider(
+new BasicAWSCredentials(accessKey, secretKey)))
+.withPathStyleAccessEnabled(true)
+.withEndpointConfiguration(
+new AwsClientBuilder.EndpointConfiguration(
+getHttpEndpoint(), "unused-region"))
+.build();
+}
+
+private String getHttpEndpoint() {
+return String.format("http://%s:%s";, getContainerIpAddress(), 
getMappedPort(DEFAULT_PORT));
+}
+
+/**
+ * Initializes the Minio instance (i.e. creating the default bucket and 
initializing Flink's
+ * FileSystems). Additionally, the passed Flink {@link Configuration} is 
extended by all
+ * relevant parameter to access the {@code Minio} instance

[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r803608001



##
File path: flink-filesystems/flink-s3-fs-presto/pom.xml
##
@@ -46,6 +46,36 @@ under the License.
provided

 
+   
+   
+   org.apache.flink
+   flink-runtime
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-runtime
+   ${project.version}
+   test-jar
+   tests

Review comment:
   It took me a bit to realize that you didn't mean the entire dependency 
but the `` :-D Fair enough, I guess, that ended up in the code 
while trying to integrate the test jars.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r803484034



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides 
{@code abstract}
+ * methods for initializing a specific {@link FileSystem}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public abstract class AbstractHaJobRunITCase {
+
+@RegisterExtension
+private static final AllCallbackWrapper 
ZOOKEEPER_EXTENSION =
+new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+@RegisterExtension
+public final EachCallbackWrapper 
miniClusterExtension =
+new EachCallbackWrapper<>(
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(1)
+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+.setConfiguration(getFlinkConfiguration())
+.build()));
+
+private Configuration getFlinkConfiguration() {
+Configuration config = new Configuration();
+config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+config.set(
+HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
+
+config.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
createHAStoragePath());
+
+updateConfiguration(config);
+
+FileSystem.initialize(config, loadPluginManager());
+initializeStorageBackend(config);
+
+return config;
+}
+
+/**
+ * Should return the path to the HA storage which will be injected into 
the Flink configuration.
+ *
+ * @see HighAvailabilityOptions#HA_STORAGE_PATH
+ */
+protected abstract String createHAStoragePath();
+
+/**
+ * Updates the passed {@link Configuration} to point to the {@link 
FileSystem} that's subject to
+ * test.
+ */
+protected abstract void updateConfiguration(Configuration config);
+
+/** Runs any additional initialization that are necessary before running 
the actual test. */
+protected void initializeStorageBackend(Configuration config) {}
+
+@Nullable
+protected PluginManager loadPluginManager() {
+// by default, no PluginManager is loaded
+return null;
+}
+
+@Test
+public void testJobExecutionInHaMode() throws E

[GitHub] [flink] XComp commented on a change in pull request #18692: [FLINK-26015] Fixes object store bug

2022-02-10 Thread GitBox


XComp commented on a change in pull request #18692:
URL: https://github.com/apache/flink/pull/18692#discussion_r803481934



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaJobRunITCase.java
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.core.testutils.EachCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterExtension;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * {@code AbstractHaJobRunITCase} runs a job storing in HA mode and provides 
{@code abstract}
+ * methods for initializing a specific {@link FileSystem}.
+ */
+@ExtendWith(TestLoggerExtension.class)
+public abstract class AbstractHaJobRunITCase {
+
+@RegisterExtension
+private static final AllCallbackWrapper 
ZOOKEEPER_EXTENSION =
+new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+@RegisterExtension
+public final EachCallbackWrapper 
miniClusterExtension =
+new EachCallbackWrapper<>(
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(1)
+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+.setConfiguration(getFlinkConfiguration())
+.build()));
+
+private Configuration getFlinkConfiguration() {
+Configuration config = new Configuration();
+config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+config.set(
+HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+ZOOKEEPER_EXTENSION.getCustomExtension().getConnectString());
+
+config.set(HighAvailabilityOptions.HA_STORAGE_PATH, 
createHAStoragePath());
+
+updateConfiguration(config);
+
+FileSystem.initialize(config, loadPluginManager());
+initializeStorageBackend(config);
+
+return config;
+}
+
+/**
+ * Should return the path to the HA storage which will be injected into 
the Flink configuration.
+ *
+ * @see HighAvailabilityOptions#HA_STORAGE_PATH
+ */
+protected abstract String createHAStoragePath();
+
+/**
+ * Updates the passed {@link Configuration} to point to the {@link 
FileSystem} that's subject to
+ * test.
+ */
+protected abstract void updateConfiguration(Configuration config);
+
+/** Runs any additional initialization that are necessary before running 
the actual test. */
+protected void initializeStorageBackend(Configuration config) {}
+
+@Nullable
+protected PluginManager loadPluginManager() {
+// by default, no PluginManager is loaded
+return null;
+}
+
+@Test
+public void testJobExecutionInHaMode() throws E