[
https://issues.apache.org/jira/browse/BEAM-14267?focusedWorklogId=756678&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756678
]
ASF GitHub Bot logged work on BEAM-14267:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Apr/22 20:33
Start Date: 13/Apr/22 20:33
Worklog Time Spent: 10m
Work Description: Abacn commented on code in PR #17305:
URL: https://github.com/apache/beam/pull/17305#discussion_r849875918
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.google.common.io.ByteStreams;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class GcsMatchIT {
+ /** Integration test for TextIO.MatchAll watching for file updates in gcs
filesystem */
+ @Test
+ public void testGcsMatchContinuously() throws InterruptedException {
+ TestPipelineOptions options =
+ TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+ assertNotNull(options.getTempRoot());
+ options.setTempLocation(options.getTempRoot() +
"/testGcsMatchContinuouslyTest");
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ String dstFolderName =
+ gcsOptions.getGcpTempLocation()
+ + String.format(
+
"/GcsMatchIT-%tF-%<tH-%<tM-%<tS-%<tL.testGcsMatchContinuously.copy/", new
Date());
+ final GcsPath watchPath = GcsPath.fromUri(dstFolderName);
+
+ Pipeline p = Pipeline.create(options);
+
+ PCollection<Metadata> matchAllUpdatedMetadata =
+ p.apply("create for matchAll updated files",
Create.of(watchPath.resolve("*").toString()))
+ .apply(
+ "matchAll updated",
+ FileIO.matchAll()
+ .continuously(
+ Duration.millis(250),
+
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)),
+ true));
+
+ // Copy the files to the "watch" directory;
+ Thread writer =
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(1000);
Review Comment:
This test basically duplicates
`org.apache.beam.sdk.io.testMatchWatchForNewFiles` and I am aware of that
adding sleep makes the test time consuming. Also curious to see if there is a
good idea
Issue Time Tracking
-------------------
Worklog Id: (was: 756678)
Time Spent: 2h 10m (was: 2h)
> Update watchForNewFiles to allow reading already read files with a new
> timestamp
> --------------------------------------------------------------------------------
>
> Key: BEAM-14267
> URL: https://issues.apache.org/jira/browse/BEAM-14267
> Project: Beam
> Issue Type: New Feature
> Components: io-java-files
> Reporter: Yi Hu
> Assignee: Yi Hu
> Priority: P2
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> In TextIO and AvroIO, we have a configuration option called watchForNewFiles,
> and in FileIO.MatchConfiguration, we have an option called watchInterval.
> Right now, these match any files according to the filtering criteria, and
> then periodically check for new files. A file is determined to be new if it
> has a different filename than a file that has already been read.
> We want to add an option to choose to consider a file new if it has a
> different timestamp from an existing file, even if the file itself has the
> same name.
> See the following design doc for more detail:
> [https://docs.google.com/document/d/1xnacyLGNh6rbPGgTAh5D1gZVR8rHUBsMMRV3YkvlL08/edit?usp=sharing&resourcekey=0-be0uF-DdmwAz6Vg4Li9FNw]
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)