[
https://issues.apache.org/jira/browse/BEAM-14267?focusedWorklogId=756666&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756666
]
ASF GitHub Bot logged work on BEAM-14267:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Apr/22 20:17
Start Date: 13/Apr/22 20:17
Worklog Time Spent: 10m
Work Description: johnjcasey commented on code in PR #17305:
URL: https://github.com/apache/beam/pull/17305#discussion_r849856522
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java:
##########
@@ -647,6 +699,14 @@ public void populateDisplayData(DisplayData.Builder
builder) {
builder.include("configuration", getConfiguration());
}
+ /** Helper function creating a watch transform based on outputKeyFn. */
+ private <KeyT> Watch.Growth<String, MatchResult.Metadata, KeyT>
CreateWatchTransform(
Review Comment:
This is a function, so the name should start lowercase to match java
convention
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java:
##########
@@ -683,6 +743,18 @@ public String apply(MatchResult.Metadata input) {
return input.resourceId().toString();
}
}
+
+ private static class ExtractFilenameAndLastUpdateFn
+ implements SerializableFunction<MatchResult.Metadata, KV<String,
Long>> {
+ @Override
+ public KV<String, Long> apply(MatchResult.Metadata input) throws
RuntimeException {
+ long timestamp = input.lastModifiedMillis();
+ if (0L == timestamp) {
+ throw new RuntimeException("Extract file timestamp failed.");
Review Comment:
Lets be more descriptive here. Most likely, this will come up when a user
does something unusual, and clear error messages are super helpful.
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.google.common.io.ByteStreams;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class GcsMatchIT {
+ /** Integration test for TextIO.MatchAll watching for file updates in gcs
filesystem */
+ @Test
+ public void testGcsMatchContinuously() throws InterruptedException {
+ TestPipelineOptions options =
+ TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+ assertNotNull(options.getTempRoot());
+ options.setTempLocation(options.getTempRoot() +
"/testGcsMatchContinuouslyTest");
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ String dstFolderName =
+ gcsOptions.getGcpTempLocation()
+ + String.format(
+
"/GcsMatchIT-%tF-%<tH-%<tM-%<tS-%<tL.testGcsMatchContinuously.copy/", new
Date());
+ final GcsPath watchPath = GcsPath.fromUri(dstFolderName);
+
+ Pipeline p = Pipeline.create(options);
+
+ PCollection<Metadata> matchAllUpdatedMetadata =
+ p.apply("create for matchAll updated files",
Create.of(watchPath.resolve("*").toString()))
+ .apply(
+ "matchAll updated",
+ FileIO.matchAll()
+ .continuously(
+ Duration.millis(250),
+
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)),
+ true));
+
+ // Copy the files to the "watch" directory;
+ Thread writer =
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(1000);
Review Comment:
It makes sense that we would need to sleep in a test to validate this
functionality, but sleeping in tests is not usually a good practice.
@chamikaramj or @pabloem is there a better pattern to use here?
Issue Time Tracking
-------------------
Worklog Id: (was: 756666)
Time Spent: 1h 50m (was: 1h 40m)
> Update watchForNewFiles to allow reading already read files with a new
> timestamp
> --------------------------------------------------------------------------------
>
> Key: BEAM-14267
> URL: https://issues.apache.org/jira/browse/BEAM-14267
> Project: Beam
> Issue Type: New Feature
> Components: io-java-files
> Reporter: Yi Hu
> Assignee: Yi Hu
> Priority: P2
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> In TextIO and AvroIO, we have a configuration option called watchForNewFiles,
> and in FileIO.MatchConfiguration, we have an option called watchInterval.
> Right now, these match any files according to the filtering criteria, and
> then periodically check for new files. A file is determined to be new if it
> has a different filename than a file that has already been read.
> We want to add an option to choose to consider a file new if it has a
> different timestamp from an existing file, even if the file itself has the
> same name.
> See the following design doc for more detail:
> [https://docs.google.com/document/d/1xnacyLGNh6rbPGgTAh5D1gZVR8rHUBsMMRV3YkvlL08/edit?usp=sharing&resourcekey=0-be0uF-DdmwAz6Vg4Li9FNw]
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)