[
https://issues.apache.org/jira/browse/BEAM-1507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293568#comment-16293568
]
ASF GitHub Bot commented on BEAM-1507:
--
jkff closed pull request #3167: [BEAM-1507] adding TTL check for staging
location
URL: https://github.com/apache/beam/pull/3167
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index d18e306cfe8..a510f802c42 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -22,17 +22,25 @@
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.dataflow.model.DataflowPackage;
+
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utility class for staging files to GCS.
*/
public class GcsStager implements Stager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GcsStager.class);
+
private DataflowPipelineOptions options;
private GcsStager(DataflowPipelineOptions options) {
@@ -46,7 +54,10 @@ public static GcsStager fromOptions(PipelineOptions options)
{
@Override
public List stageFiles() {
+
checkNotNull(options.getStagingLocation());
+warnIfStagingHasTTL();
+
String windmillBinary =
options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
if (windmillBinary != null) {
@@ -67,4 +78,19 @@ public static GcsStager fromOptions(PipelineOptions options)
{
options.getStagingLocation(),
createOptions);
}
+
+ private void warnIfStagingHasTTL() {
+try {
+ LOG.debug("Checking if staging location {} has TTL assigned",
options.getStagingLocation());
+ boolean stagingHasTTL = options.as(GcsOptions.class).getGcsUtil()
+ .bucketHasTTL(GcsPath.fromUri(options.getStagingLocation()));
+ if (stagingHasTTL) {
+LOG.warn("Staging location {} has TTL assigned. This might cause
unpredictable bugs."
+, options.getStagingLocation());
+ }
+} catch (Exception ie) {
+ LOG.warn("Exception while trying to determine if staging location has
TTL assigned", ie);
+}
+ }
+
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 94b733a7f8d..63b43a46ba4 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -30,6 +30,7 @@
import com.google.api.client.util.Sleeper;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Bucket.Lifecycle.Rule;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.auto.value.AutoValue;
@@ -454,6 +455,26 @@ public boolean bucketAccessible(GcsPath path) throws
IOException {
Sleeper.DEFAULT);
}
+ /**
+ * Returns whether the GCS bucket has TTL assigned.
+ *
+ * @param path the path to GCS bucket
+ * @return true if TTL is assigned to bucket, false otherwise
+ */
+ public boolean bucketHasTTL(GcsPath path) throws IOException {
+Bucket bucket = getBucket(path, createBackOff(), Sleeper.DEFAULT);
+if (bucket != null && bucket.getLifecycle() != null
+&& bucket.getLifecycle().getRule() != null) {
+ for (Rule r : bucket.getLifecycle().getRule()) {
+if ("Delete".equalsIgnoreCase(r.getAction().getType())) {
+ LOG.debug("Bucket {} has TTL assigned", path);
+ return true;
+}
+ }
+}
+return false;
+ }
+
/**
* Returns the project number of the project which owns this bucket.
* If the bucket exists, it must be