[ 
https://issues.apache.org/jira/browse/BEAM-6611?focusedWorklogId=279077&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-279077
 ]

ASF GitHub Bot logged work on BEAM-6611:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jul/19 15:29
            Start Date: 18/Jul/19 15:29
    Worklog Time Spent: 10m 
      Work Description: ttanay commented on pull request #8871: [BEAM-6611] 
BigQuery file loads in Streaming for Python SDK
URL: https://github.com/apache/beam/pull/8871#discussion_r304981325
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
 ##########
 @@ -550,6 +562,25 @@ def verify(self):
                        'loaded into BigQuery. Please provide a GCS bucket, or '
                        'pass method="STREAMING_INSERTS" to WriteToBigQuery.'
                        % self._custom_gcs_temp_location.get())
+    if self.is_streaming_pipeline and not self.triggering_frequency:
+      raise ValueError('triggering_frequency must be specified to use file'
+                       'loads in streaming')
+    elif not self.is_streaming_pipeline and self.triggering_frequency:
+      raise ValueError('triggering_frequency can only be used with file'
+                       'loads in streaming')
+
+  def _window_fn(self):
+    if self.is_streaming_pipeline:
+      return beam.WindowInto(beam.window.GlobalWindows(),
+                             trigger=trigger.Repeatedly(
+                                 trigger.AfterAny(
+                                     trigger.AfterProcessingTime(
+                                         self.triggering_frequency),
+                                     trigger.AfterCount(
+                                         _FILE_TRIGGERING_RECORD_COUNT))),
+                             accumulation_mode=trigger.AccumulationMode\
+                              .DISCARDING)
+    return beam.WindowInto(beam.window.GlobalWindows())
 
 Review comment:
   Sure. I'll make the change.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 279077)
    Time Spent: 5h  (was: 4h 50m)

> A Python Sink for BigQuery with File Loads in Streaming
> -------------------------------------------------------
>
>                 Key: BEAM-6611
>                 URL: https://issues.apache.org/jira/browse/BEAM-6611
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Tanay Tummalapalli
>            Priority: Major
>              Labels: gsoc, gsoc2019, mentor
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> The Java SDK supports a bunch of methods for writing data into BigQuery, 
> while the Python SDK supports the following:
> - Streaming inserts for streaming pipelines [As seen in [bigquery.py and 
> BigQueryWriteFn|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L649-L813]]
> - File loads for batch pipelines [As implemented in [PR 
> 7655|https://github.com/apache/beam/pull/7655]]
> Qucik and dirty early design doc: https://s.apache.org/beam-bqfl-py-streaming
> The Java SDK also supports File Loads for Streaming pipelines [see BatchLoads 
> application|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1709-L1776].
> File loads have the advantage of being much cheaper than streaming inserts 
> (although they also are slower for the records to show up in the table).



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to