[ 
https://issues.apache.org/jira/browse/BEAM-14267?focusedWorklogId=756666&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756666
 ]

ASF GitHub Bot logged work on BEAM-14267:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/22 20:17
            Start Date: 13/Apr/22 20:17
    Worklog Time Spent: 10m 
      Work Description: johnjcasey commented on code in PR #17305:
URL: https://github.com/apache/beam/pull/17305#discussion_r849856522


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java:
##########
@@ -647,6 +699,14 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       builder.include("configuration", getConfiguration());
     }
 
+    /** Helper function creating a watch transform based on outputKeyFn. */
+    private <KeyT> Watch.Growth<String, MatchResult.Metadata, KeyT> 
CreateWatchTransform(

Review Comment:
   This is a function, so the name should start lowercase to match java 
convention



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java:
##########
@@ -683,6 +743,18 @@ public String apply(MatchResult.Metadata input) {
         return input.resourceId().toString();
       }
     }
+
+    private static class ExtractFilenameAndLastUpdateFn
+        implements SerializableFunction<MatchResult.Metadata, KV<String, 
Long>> {
+      @Override
+      public KV<String, Long> apply(MatchResult.Metadata input) throws 
RuntimeException {
+        long timestamp = input.lastModifiedMillis();
+        if (0L == timestamp) {
+          throw new RuntimeException("Extract file timestamp failed.");

Review Comment:
   Lets be more descriptive here. Most likely, this will come up when a user 
does something unusual, and clear error messages are super helpful. 



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.google.common.io.ByteStreams;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class GcsMatchIT {
+  /** Integration test for TextIO.MatchAll watching for file updates in gcs 
filesystem */
+  @Test
+  public void testGcsMatchContinuously() throws InterruptedException {
+    TestPipelineOptions options =
+        TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+    assertNotNull(options.getTempRoot());
+    options.setTempLocation(options.getTempRoot() + 
"/testGcsMatchContinuouslyTest");
+    GcsOptions gcsOptions = options.as(GcsOptions.class);
+    String dstFolderName =
+        gcsOptions.getGcpTempLocation()
+            + String.format(
+                
"/GcsMatchIT-%tF-%<tH-%<tM-%<tS-%<tL.testGcsMatchContinuously.copy/", new 
Date());
+    final GcsPath watchPath = GcsPath.fromUri(dstFolderName);
+
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<Metadata> matchAllUpdatedMetadata =
+        p.apply("create for matchAll updated files", 
Create.of(watchPath.resolve("*").toString()))
+            .apply(
+                "matchAll updated",
+                FileIO.matchAll()
+                    .continuously(
+                        Duration.millis(250),
+                        
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)),
+                        true));
+
+    // Copy the files to the "watch" directory;
+    Thread writer =
+        new Thread(
+            () -> {
+              try {
+                Thread.sleep(1000);

Review Comment:
   It makes sense that we would need to sleep in a test to validate this 
functionality, but sleeping in tests is not usually a good practice. 
@chamikaramj  or @pabloem is there a better pattern to use here?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 756666)
    Time Spent: 1h 50m  (was: 1h 40m)

> Update watchForNewFiles to allow reading already read files with a new 
> timestamp
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-14267
>                 URL: https://issues.apache.org/jira/browse/BEAM-14267
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-files
>            Reporter: Yi Hu
>            Assignee: Yi Hu
>            Priority: P2
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> In TextIO and AvroIO, we have a configuration option called watchForNewFiles, 
> and in FileIO.MatchConfiguration, we have an option called watchInterval. 
> Right now, these match any files according to the filtering criteria, and 
> then periodically check for new files. A file is determined to be new if it 
> has a different filename than a file that has already been read.
> We want to add an option to choose to consider a file new if it has a 
> different timestamp from an existing file, even if the file itself has the 
> same name.
> See the following design doc for more detail:
> [https://docs.google.com/document/d/1xnacyLGNh6rbPGgTAh5D1gZVR8rHUBsMMRV3YkvlL08/edit?usp=sharing&resourcekey=0-be0uF-DdmwAz6Vg4Li9FNw]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to