pabloem commented on a change in pull request #12151:
URL: https://github.com/apache/beam/pull/12151#discussion_r451286482



##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -644,6 +747,26 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return toBuilder().setUserDataMapper(userDataMapper).build();
     }
 
+    public Write<T> withFlushTimeLimit(Duration triggeringFrequency) {
+      return toBuilder().setFlushTimeLimit(triggeringFrequency).build();
+    }
+
+    public Write<T> withSnowPipe(String snowPipe) {
+      return 
toBuilder().setSnowPipe(ValueProvider.StaticValueProvider.of(snowPipe)).build();
+    }
+
+    public Write<T> withSnowPipe(ValueProvider<String> snowPipe) {
+      return toBuilder().setSnowPipe(snowPipe).build();
+    }
+
+    public Write<T> withShardsNumber(Integer shardsNumber) {
+      return toBuilder().setShardsNumber(shardsNumber).build();
+    }
+
+    public Write<T> withFlushRowLimit(Integer rowsCount) {
+      return toBuilder().setFlushRowLimit(rowsCount).build();
+    }

Review comment:
       Please add detailed Javadoc for these functions, as they will be the 
point of entry for users - they also will help me understand what's a snow pipe 
: )

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -684,14 +819,61 @@ private void checkArguments() {
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
 
-      checkArgument(getTable() != null, "withTable() is required");
+      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) {
+        checkArgument(getSnowPipe() != null, "withSnowPipe() is required");

Review comment:
       Please improve the error message specifying that withSnowPipe is 
required for streaming / unbounded PCollections

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -684,14 +819,61 @@ private void checkArguments() {
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
 
-      checkArgument(getTable() != null, "withTable() is required");
+      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) {
+        checkArgument(getSnowPipe() != null, "withSnowPipe() is required");
+      } else {
+        checkArgument(getTable() != null, "to() is required");

Review comment:
       Please make the error message more informative

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -684,14 +819,61 @@ private void checkArguments() {
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
 
-      checkArgument(getTable() != null, "withTable() is required");
+      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) {
+        checkArgument(getSnowPipe() != null, "withSnowPipe() is required");
+      } else {
+        checkArgument(getTable() != null, "to() is required");
+      }
     }
 
-    private PCollection<String> write(PCollection<T> input, String 
stagingBucketDir) {
+    private PCollection<T> writeStream(PCollection<T> input, String 
stagingBucketDir) {
       SnowflakeService snowflakeService =
-          getSnowflakeService() != null ? getSnowflakeService() : new 
SnowflakeServiceImpl();
+          getSnowflakeService() != null
+              ? getSnowflakeService()
+              : new SnowflakeStreamingServiceImpl();
+
+      /* Ensure that files will be created after specific record count or 
duration specified */
+      PCollection<T> inputInGlobalWindow =
+          input.apply(
+              "rewindowIntoGlobal",
+              Window.<T>into(new GlobalWindows())
+                  .triggering(
+                      Repeatedly.forever(
+                          AfterFirst.of(
+                              AfterProcessingTime.pastFirstElementInPane()
+                                  .plusDelayOf(getFlushTimeLimit()),
+                              
AfterPane.elementCountAtLeast(getFlushRowLimit()))))
+                  .discardingFiredPanes());
+
+      int shards = (getShardsNumber() > 0) ? getShardsNumber() : 
DEFAULT_STREAMING_SHARDS_NUMBER;
+      PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir, 
shards);
+
+      /* Ensuring that files will be ingested after flush time */
+      files =
+          (PCollection)
+              files.apply(
+                  "applyUserTrigger",
+                  Window.<T>into(new GlobalWindows())
+                      .triggering(
+                          Repeatedly.forever(
+                              AfterProcessingTime.pastFirstElementInPane()
+                                  .plusDelayOf(getFlushTimeLimit())))
+                      .discardingFiredPanes());
+      files =
+          (PCollection)
+              files.apply(
+                  "Create list of logs to copy",
+                  Combine.globally(new Concatenate()).withoutDefaults());

Review comment:
       We are concatenating this in memory. Is this intended? I guess you would 
end up with a number of files equal to the number `shards`, which shouldn't be 
too large?

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -684,14 +819,61 @@ private void checkArguments() {
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is 
required");
 
-      checkArgument(getTable() != null, "withTable() is required");
+      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) {
+        checkArgument(getSnowPipe() != null, "withSnowPipe() is required");
+      } else {
+        checkArgument(getTable() != null, "to() is required");
+      }
     }
 
-    private PCollection<String> write(PCollection<T> input, String 
stagingBucketDir) {
+    private PCollection<T> writeStream(PCollection<T> input, String 
stagingBucketDir) {
       SnowflakeService snowflakeService =
-          getSnowflakeService() != null ? getSnowflakeService() : new 
SnowflakeServiceImpl();
+          getSnowflakeService() != null
+              ? getSnowflakeService()
+              : new SnowflakeStreamingServiceImpl();
+
+      /* Ensure that files will be created after specific record count or 
duration specified */
+      PCollection<T> inputInGlobalWindow =
+          input.apply(
+              "rewindowIntoGlobal",
+              Window.<T>into(new GlobalWindows())
+                  .triggering(
+                      Repeatedly.forever(
+                          AfterFirst.of(
+                              AfterProcessingTime.pastFirstElementInPane()
+                                  .plusDelayOf(getFlushTimeLimit()),
+                              
AfterPane.elementCountAtLeast(getFlushRowLimit()))))
+                  .discardingFiredPanes());
+
+      int shards = (getShardsNumber() > 0) ? getShardsNumber() : 
DEFAULT_STREAMING_SHARDS_NUMBER;
+      PCollection files = writeFiles(inputInGlobalWindow, stagingBucketDir, 
shards);
+
+      /* Ensuring that files will be ingested after flush time */
+      files =
+          (PCollection)
+              files.apply(
+                  "applyUserTrigger",
+                  Window.<T>into(new GlobalWindows())
+                      .triggering(
+                          Repeatedly.forever(
+                              AfterProcessingTime.pastFirstElementInPane()
+                                  .plusDelayOf(getFlushTimeLimit())))
+                      .discardingFiredPanes());

Review comment:
       This PCollection should keep the trigger configuration from upstream. 
Did you find that you needed this?

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeBatchServiceConfig.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.services;
+
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+public class SnowflakeBatchServiceConfig extends ServiceConfig {

Review comment:
       Can you add javadoc for this class? It'll help understand its role, and 
it'll help users figure out if they're meant to use this class or not.

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -654,28 +777,40 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
 
     /**
-     * A snowflake service which is supposed to be used. Note: Currently we 
have {@link
-     * SnowflakeServiceImpl} with corresponding {@link 
FakeSnowflakeServiceImpl} used for testing.
+     * A snowflake service {@link SnowflakeService} implementation which is 
supposed to be used.
      *
      * @param snowflakeService - an instance of {@link SnowflakeService}.
      */
     public Write<T> withSnowflakeService(SnowflakeService snowflakeService) {
       return toBuilder().setSnowflakeService(snowflakeService).build();
     }
 
+    public Write<T> withQuotationMark(String quotationMark) {
+      return toBuilder().setQuotationMark(quotationMark).build();
+    }
+
+    public Write<T> withDebugMode(StreamingLogLevel debugLevel) {
+      return toBuilder().setDebugMode(debugLevel).build();
+    }

Review comment:
       Please add javadoc

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceImpl.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.services;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import net.snowflake.ingest.SimpleIngestManager;
+import net.snowflake.ingest.connection.IngestResponseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Implemenation of {@link SnowflakeService} used in production. */
+public class SnowflakeStreamingServiceImpl
+    implements SnowflakeService<SnowflakeStreamingServiceConfig> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SnowflakeStreamingServiceImpl.class);
+  private transient SimpleIngestManager ingestManager;
+
+  @Override
+  public void write(SnowflakeStreamingServiceConfig config) throws Exception {
+    ingest(config);
+  }
+
+  @Override
+  public String read(SnowflakeStreamingServiceConfig config) throws Exception {
+    throw new UnsupportedOperationException("Not supported by SnowflakeIO.");
+  }
+
+  public void ingest(SnowflakeStreamingServiceConfig config)
+      throws IngestResponseException, IOException, URISyntaxException {
+    List<String> filesList = config.filesList;
+    String stagingBucketDir = config.stagingBucketDir;
+    ingestManager = config.ingestManager;
+
+    List<String> newList =
+        filesList.stream()
+            .map(e -> e.replaceAll(String.valueOf(stagingBucketDir), ""))
+            .map(e -> e.replaceAll("'", ""))
+            .collect(Collectors.toList());
+
+    Set<String> files = new TreeSet<>();
+    for (String file : newList) {
+      files.add(file);
+    }

Review comment:
       I am a little confused about these lines (53-62). What does this do?

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -352,15 +385,16 @@
       return toBuilder().setCoder(coder).build();
     }
 
+    public Read<T> withQuotationMark(String quotationMark) {

Review comment:
       Javadoc please

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -828,31 +1037,152 @@ private String quoteField(String field, String 
quotation) {
         String stagingBucketDir,
         String storageIntegrationName,
         WriteDisposition writeDisposition,
-        SnowflakeService snowflakeService) {
+        SnowflakeService snowflakeService,
+        String quotationMark) {
       this.dataSourceProviderFn = dataSourceProviderFn;
-      this.table = table;
       this.query = query;
+      this.table = table;
       this.stagingBucketDir = stagingBucketDir;
       this.storageIntegrationName = storageIntegrationName;
       this.writeDisposition = writeDisposition;
       this.snowflakeService = snowflakeService;
+      this.quotationMark = quotationMark;
+
+      DataSourceProviderFromDataSourceConfiguration 
dataSourceProviderFromDataSourceConfiguration =
+          (DataSourceProviderFromDataSourceConfiguration) 
this.dataSourceProviderFn;
+      DataSourceConfiguration config = 
dataSourceProviderFromDataSourceConfiguration.getConfig();
+
+      this.database = config.getDatabase();
+      this.schema = config.getSchema();
     }
 
     @ProcessElement
     public void processElement(ProcessContext context) throws Exception {
-      SnowflakeServiceConfig config =
-          new SnowflakeServiceConfig(
+      SnowflakeBatchServiceConfig config =
+          new SnowflakeBatchServiceConfig(
               dataSourceProviderFn,
               (List<String>) context.element(),
+              database,
+              schema,
               table,
               query,
               writeDisposition,
               storageIntegrationName,
-              stagingBucketDir);
+              stagingBucketDir,
+              quotationMark);
       snowflakeService.write(config);
     }
   }
 
+  /** Custom DoFn that streams data to Snowflake table. */
+  private static class StreamToTableFn<ParameterT, OutputT> extends 
DoFn<ParameterT, OutputT> {
+    private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
+    private final String stagingBucketDir;
+    private final ValueProvider<String> snowPipe;
+    private final StreamingLogLevel debugMode;
+    private final SnowflakeService snowflakeService;
+    private transient SimpleIngestManager ingestManager;
+
+    private transient DataSource dataSource;
+    ArrayList<String> trackedFilesNames;
+
+    StreamToTableFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn,
+        ValueProvider<String> snowPipe,
+        String stagingBucketDir,
+        StreamingLogLevel debugMode,
+        SnowflakeService snowflakeService) {
+      this.dataSourceProviderFn = dataSourceProviderFn;
+      this.stagingBucketDir = stagingBucketDir;
+      this.snowPipe = snowPipe;
+      this.debugMode = debugMode;
+      this.snowflakeService = snowflakeService;
+      trackedFilesNames = new ArrayList<>();
+    }
+
+    @Setup
+    public void setup() throws Exception {
+      dataSource = dataSourceProviderFn.apply(null);
+
+      DataSourceProviderFromDataSourceConfiguration 
dataSourceProviderFromDataSourceConfiguration =
+          (DataSourceProviderFromDataSourceConfiguration) 
this.dataSourceProviderFn;
+      DataSourceConfiguration config = 
dataSourceProviderFromDataSourceConfiguration.getConfig();
+
+      checkArgument(config.getPrivateKey() != null, "KeyPair is required for 
authentication");
+
+      String hostName = config.getServerName();
+      List<String> path = Splitter.on('.').splitToList(hostName);
+      String account = path.get(0);
+      String username = config.getUsername();
+      PrivateKey privateKey = config.getPrivateKey();
+      String schema = config.getSchema();
+      String database = config.getDatabase();
+      String snowPipeName = String.format("%s.%s.%s", database, schema, 
snowPipe.get());
+
+      this.ingestManager =
+          new SimpleIngestManager(
+              account, username, snowPipeName, privateKey, "https", hostName, 
443);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) throws Exception {
+      List<String> filesList = (List<String>) context.element();
+
+      if (debugMode != null) {
+        trackedFilesNames.addAll(filesList);
+      }
+      SnowflakeStreamingServiceConfig config =
+          new SnowflakeStreamingServiceConfig(filesList, 
this.stagingBucketDir, this.ingestManager);
+      snowflakeService.write(config);
+    }
+
+    @FinishBundle
+    public void finishBundle() throws Exception {
+      if (debugMode != null) {

Review comment:
       TODO(pabloem) Review this section

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.services;
+
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+
+public class SnowflakeStreamingServiceConfig extends ServiceConfig {
+  SimpleIngestManager ingestManager;
+  List<String> filesList;
+  String stagingBucketDir;

Review comment:
       Maybe these fields should be private and final since we have getters? 

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/services/SnowflakeStreamingServiceConfig.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.services;
+
+import java.util.List;
+import net.snowflake.ingest.SimpleIngestManager;
+
+public class SnowflakeStreamingServiceConfig extends ServiceConfig {

Review comment:
       Add Javadoc to this class? (including whether it's considered part of 
the public API)

##########
File path: 
sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.snowflake.test.unit.write;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import net.snowflake.client.jdbc.SnowflakeSQLException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
+import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
+import 
org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory;
+import org.apache.beam.sdk.io.snowflake.services.SnowflakeService;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeBasicDataSource;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeDatabase;
+import org.apache.beam.sdk.io.snowflake.test.FakeSnowflakeStreamingServiceImpl;
+import org.apache.beam.sdk.io.snowflake.test.TestUtils;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.hamcrest.MatcherAssert;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+public class StreamingWriteTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingWriteTest.class);
+  private static final String FAKE_TABLE = "TEST_TABLE";
+  private static final String STAGING_BUCKET_NAME = "BUCKET/";
+  private static final String STORAGE_INTEGRATION_NAME = "STORAGE_INTEGRATION";
+  private static final String SNOW_PIPE = "Snowpipe";
+  private static final Instant START_TIME = new Instant(0);
+
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Rule public ExpectedException exceptionRule = ExpectedException.none();
+  private static SnowflakeIO.DataSourceConfiguration dataSourceConfiguration;
+  private static SnowflakeService snowflakeService;
+  private static SnowflakePipelineOptions options;
+  private static List<Long> testData;
+
+  private static final List<String> SENTENCES =

Review comment:
       TODO(pabloem) Review these tests

##########
File path: 
sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
##########
@@ -719,7 +906,9 @@ private void checkArguments() {
                           return getUserDataMapper().mapRow(element);
                         }
                       }))
-              .apply("Map Objects array to CSV lines", ParDo.of(new 
MapObjectsArrayToCsvFn()))
+              .apply(
+                  "Map Objects array to CSV lines",
+                  ParDo.of(new MapObjectsArrayToCsvFn(getQuotationMark())))

Review comment:
       I know this was discussed earlier, but I am somewhat concerned about 
using CSV as the method to write. Is this not risky? Have you verified that it 
works well for all types (Numeric, Date/Time, Decimal, Strings with newlines, 
etc.)?
   
   I recommend you add an integration test that tests all - or most - data 
types once we have the Snowflake instance to test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to