[GitHub] [beam] rezarokni commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

2020-06-24 Thread GitBox


rezarokni commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-649232286


   @reuvenlax nudge :-)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rafi-kamal opened a new pull request #12089: [BEAM-10283] Add new overloads of withKeyRanges and withRowFilter met…

2020-06-24 Thread GitBox


rafi-kamal opened a new pull request #12089:
URL: https://github.com/apache/beam/pull/12089


   …hods that take ValueProvider as a parameter.
   
   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[GitHub] [beam] piotr-szuberski removed a comment on pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform

2020-06-24 Thread GitBox


piotr-szuberski removed a comment on pull request #12023:
URL: https://github.com/apache/beam/pull/12023#issuecomment-649225729







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski removed a comment on pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform

2020-06-24 Thread GitBox


piotr-szuberski removed a comment on pull request #12023:
URL: https://github.com/apache/beam/pull/12023#issuecomment-648777087


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] piotr-szuberski commented on pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform

2020-06-24 Thread GitBox


piotr-szuberski commented on pull request #12023:
URL: https://github.com/apache/beam/pull/12023#issuecomment-649225729


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #12020: [BEAM-9066] Add javax.annotation-api dependency.

2020-06-24 Thread GitBox


kennknowles commented on pull request #12020:
URL: https://github.com/apache/beam/pull/12020#issuecomment-649188578


   TBH I am not sure. I believe that GPL + classpath exception is standard and 
probably best. It is what allows all Java users to make Java programs that are 
not GPL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robinyqiu commented on pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL

2020-06-24 Thread GitBox


robinyqiu commented on pull request #12054:
URL: https://github.com/apache/beam/pull/12054#issuecomment-649183345


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robinyqiu commented on a change in pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL

2020-06-24 Thread GitBox


robinyqiu commented on a change in pull request #12054:
URL: https://github.com/apache/beam/pull/12054#discussion_r445279103



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
##
@@ -27,4 +28,7 @@ private SqlTypes() {}
 
   /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATE type. */
   public static final LogicalType DATE = new Date();
+
+  /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL TIME type. */
+  public static final LogicalType TIME = new Time();

Review comment:
   Now the internal build passes.
   
   Thanks for the question. Conversion is lossless. Actually it is done by 
calling ZetaSQL code: `CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime()` 
and `encodePacked64TimeNanos()`. See conversion code in 
`ZetaSqlBeamTranslationUtils.java`. I looked at the code and it seems to be 
purely bit manipulation, so I guess it's not very expensive.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim commented on a change in pull request #12088: Add output typehints to GroupIntoBatches

2020-06-24 Thread GitBox


udim commented on a change in pull request #12088:
URL: https://github.com/apache/beam/pull/12088#discussion_r445278906



##
File path: sdks/python/apache_beam/transforms/util.py
##
@@ -741,6 +741,7 @@ def WithKeys(pcoll, k):
 
 @experimental()
 @typehints.with_input_types(Tuple[K, V])
+@typehints.with_output_types(Tuple[K, List[V]])

Review comment:
   cc: @lukecwik 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim commented on a change in pull request #12088: Add output typehints to GroupIntoBatches

2020-06-24 Thread GitBox


udim commented on a change in pull request #12088:
URL: https://github.com/apache/beam/pull/12088#discussion_r445278714



##
File path: sdks/python/apache_beam/transforms/util.py
##
@@ -741,6 +741,7 @@ def WithKeys(pcoll, k):
 
 @experimental()
 @typehints.with_input_types(Tuple[K, V])
+@typehints.with_output_types(Tuple[K, List[V]])

Review comment:
   I was thinking it might be Iterable rather than List, so I looked at the 
code and it doesn't seem to group by key at all.
   
   I modified a test from `GroupIntoBatchesTest` to print to elements:
   ```py
 def test_in_global_window(self):
   with TestPipeline() as pipeline:
 def print_elements(e):
   print(e)
   return e
   
 collection = pipeline \
  | beam.Create(GroupIntoBatchesTest._create_test_data()) \
  | util.GroupIntoBatches(GroupIntoBatchesTest.BATCH_SIZE) \
  | beam.Map(print_elements)
   ```
   
   Printout:
   ```py
   [('key', 'Einstein'), ('key', 'Darwin'), ('key', 'Copernicus'), ('key', 
'Pasteur'), ('key', 'Curie')]
   [('key', 'Faraday'), ('key', 'Newton'), ('key', 'Bohr'), ('key', 'Galilei'), 
('key', 'Maxwell')]
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim commented on a change in pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()

2020-06-24 Thread GitBox


udim commented on a change in pull request #12009:
URL: https://github.com/apache/beam/pull/12009#discussion_r445246250



##
File path: sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
##
@@ -40,6 +40,14 @@ def process(self, element: int) -> typehints.Tuple[str]:
 with self.assertRaisesRegex(typehints.TypeCheckError,
 r'requires.*int.*got.*str'):
   _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
+  def test_pardo_dofn(self):

Review comment:
   Did you mean to leave this test here? It looks like a copy of the one in 
AnnotationsTest.

##
File path: sdks/python/apache_beam/typehints/typehints_test_py3.py
##
@@ -46,11 +51,61 @@ class MyDoFn(DoFn):
   def process(self, element: int) -> Iterable[str]:
 pass
 
-print(MyDoFn().get_type_hints())
 th = MyDoFn().get_type_hints()
 self.assertEqual(th.input_types, ((int, ), {}))
 self.assertEqual(th.output_types, ((str, ), {}))
 
 
+class TestPTransformAnnotations(unittest.TestCase):
+  def test_pep484_annotations(self):
+class MyPTransform(PTransform):
+  def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
+return pcoll | Map(lambda num: str(num))
+
+th = MyPTransform().get_type_hints()
+self.assertEqual(th.input_types, ((int, ), {}))
+self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_annotations_without_pcollection_wrapper(self):
+class MyPTransform(PTransform):
+  def expand(self, pcoll: int) -> str:
+return pcoll | Map(lambda num: str(num))
+
+with self.assertRaises(TypeCheckError) as error:
+  _th = MyPTransform().get_type_hints()
+
+self.assertEqual(str(error.exception), 'An input typehint to a PTransform 
must be a single (or nested) type '
+   'wrapped by a PCollection.')
+
+  def test_annotations_without_internal_type(self):
+class MyPTransform(PTransform):
+  def expand(self, pcoll: PCollection) -> PCollection:

Review comment:
   This is valid. The type hint should convert to `Any`.
   
   Quoting from https://docs.python.org/3/library/typing.html:
   > Using a generic class without specifying type parameters assumes Any for 
each position.

##
File path: sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
##
@@ -257,6 +265,65 @@ def fn2(element: int) -> int:
 result = [1, 2, 3] | beam.FlatMap(fn) | beam.Map(fn2)
 self.assertCountEqual([4, 6], result)
 
+  def test_typed_ptransform_with_no_error(self):
+class StrToInt(beam.PTransform):
+  def expand(self, pcoll: beam.pvalue.PCollection[str]) -> 
beam.pvalue.PCollection[int]:
+return pcoll | beam.Map(lambda x: int(x))
+
+class IntToStr(beam.PTransform):
+  def expand(self, pcoll: beam.pvalue.PCollection[int]) -> 
beam.pvalue.PCollection[str]:
+return pcoll | beam.Map(lambda x: str(x))
+
+try:
+  _ = ['1', '2', '3'] | StrToInt() | IntToStr()
+except Exception:
+  self.fail('An unexpected error was raised during a pipeline with correct 
typehints.')
+
+  def test_typed_ptransform_with_bad_typehints(self):
+class StrToInt(beam.PTransform):
+  def expand(self, pcoll: beam.pvalue.PCollection[str]) -> 
beam.pvalue.PCollection[int]:
+return pcoll | beam.Map(lambda x: int(x))
+
+class IntToStr(beam.PTransform):
+  def expand(self, pcoll: beam.pvalue.PCollection[str]) -> 
beam.pvalue.PCollection[str]:
+return pcoll | beam.Map(lambda x: str(x))
+
+with self.assertRaises(typehints.TypeCheckError) as error:
+  # raises error because of mismatched typehints between StrToInt and 
IntToStr
+  _ = ['1', '2', '3'] | StrToInt() | IntToStr()
+
+self.assertTrue("Input type hint violation at IntToStr: expected , got " in str(error.exception))
+
+  def test_typed_ptransform_with_bad_input(self):
+class StrToInt(beam.PTransform):
+  def expand(self, pcoll: beam.pvalue.PCollection[str]) -> 
beam.pvalue.PCollection[int]:
+return pcoll | beam.Map(lambda x: int(x))
+
+class IntToStr(beam.PTransform):
+  def expand(self, pcoll: beam.pvalue.PCollection[int]) -> 
beam.pvalue.PCollection[str]:
+return pcoll | beam.Map(lambda x: str(x))
+
+with self.assertRaises(typehints.TypeCheckError) as error:
+  # Feed integers to a PTransform that expects strings
+  _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+self.assertTrue("Input type hint violation at StrToInt: expected , got " in str(error.exception))

Review comment:
   Please use `with self.assertRaisesRegex(..)` above instead of separately 
checking the exception text.

##
File path: sdks/python/apache_beam/transforms/ptransform.py
##
@@ -364,6 +366,15 @@ def default_label(self):
 # type: () -> str
 return self.__class__.__name__
 
+  def default_type_hints(self):
+fn_type_hints = IOTypeHints.from_callable(self.expand)
+if fn_type_hints is 

[GitHub] [beam] aaltay opened a new pull request #12088: Add output typehints to GroupIntoBatches

2020-06-24 Thread GitBox


aaltay opened a new pull request #12088:
URL: https://github.com/apache/beam/pull/12088


   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[GitHub] [beam] TheNeuralBit commented on pull request #12067: [BEAM-10308] Make component ID assignments consistent across PipelineContext instances

2020-06-24 Thread GitBox


TheNeuralBit commented on pull request #12067:
URL: https://github.com/apache/beam/pull/12067#issuecomment-649168647


   I have a POC working that solves the problem by fixing all of the component 
IDs in the external transforms in `to_runner_api()`. It feels pretty 
error-prone, but it doesn't maintain any global state, and keeps all of the 
special logic within ExternalTransform. I'll clean it up and push it here 
tomorrow morning.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji commented on pull request #12085: [BEAM-10318] fix uninitialized grpc_server in FnApiRunner

2020-06-24 Thread GitBox


ihji commented on pull request #12085:
URL: https://github.com/apache/beam/pull/12085#issuecomment-649161136


   R: @chamikaramj 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji commented on pull request #12086: [BEAM-10322] allow only single assignment to producing stages by pcol…

2020-06-24 Thread GitBox


ihji commented on pull request #12086:
URL: https://github.com/apache/beam/pull/12086#issuecomment-649161049


   R: @pablom
   CC: @robertwb 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji commented on a change in pull request #12060: [BEAM-10218] Add FnApiRunner to cross-language validate runner test

2020-06-24 Thread GitBox


ihji commented on a change in pull request #12060:
URL: https://github.com/apache/beam/pull/12060#discussion_r445260087



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##
@@ -375,6 +375,8 @@ def get_all_side_inputs():
 for o in transform.outputs.values():
   if o in s.side_inputs():
 continue
+  if o in producing_stages_by_pcoll:

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji commented on pull request #12087: [BEAM-10321] retain environments in flatten for preventing it from be…

2020-06-24 Thread GitBox


ihji commented on pull request #12087:
URL: https://github.com/apache/beam/pull/12087#issuecomment-649160879


   R: @pabloem 
   CC: @robertwb 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim commented on a change in pull request #12076: [BEAM-10143] Add test verifying external windowing

2020-06-24 Thread GitBox


udim commented on a change in pull request #12076:
URL: https://github.com/apache/beam/pull/12076#discussion_r445259660



##
File path: sdks/python/apache_beam/transforms/sql_test.py
##
@@ -157,6 +157,21 @@ def test_zetasql_generate_data(self):
   dialect="zetasql")
   assert_that(out, equal_to([(1, "foo", 3.14)]))
 
+  def test_windowing_before_sql(self):
+with TestPipeline() as p:
+  windowed = (
+  p | beam.Create([
+  SimpleRow(5, "foo", 1.),
+  SimpleRow(15, "bar", 2.),
+  SimpleRow(25, "baz", 3.)
+  ])
+  | beam.Map(lambda v: beam.window.TimestampedValue(v, v.id)).
+  with_output_types(SimpleRow)

Review comment:
   https://issues.apache.org/jira/browse/BEAM-10323





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ihji opened a new pull request #12086: [BEAM-10322] allow only single assignment to producing stages by pcol…

2020-06-24 Thread GitBox


ihji opened a new pull request #12086:
URL: https://github.com/apache/beam/pull/12086


   …lection map
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[GitHub] [beam] ihji opened a new pull request #12085: [BEAM-10318] fix uninitialized grpc_server in FnApiRunner

2020-06-24 Thread GitBox


ihji opened a new pull request #12085:
URL: https://github.com/apache/beam/pull/12085


   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[GitHub] [beam] udim commented on a change in pull request #12076: [BEAM-10143] Add test verifying external windowing

2020-06-24 Thread GitBox


udim commented on a change in pull request #12076:
URL: https://github.com/apache/beam/pull/12076#discussion_r445257811



##
File path: sdks/python/apache_beam/transforms/sql_test.py
##
@@ -157,6 +157,21 @@ def test_zetasql_generate_data(self):
   dialect="zetasql")
   assert_that(out, equal_to([(1, "foo", 3.14)]))
 
+  def test_windowing_before_sql(self):
+with TestPipeline() as p:
+  windowed = (
+  p | beam.Create([
+  SimpleRow(5, "foo", 1.),
+  SimpleRow(15, "bar", 2.),
+  SimpleRow(25, "baz", 3.)
+  ])
+  | beam.Map(lambda v: beam.window.TimestampedValue(v, v.id)).
+  with_output_types(SimpleRow)

Review comment:
   I'm not sure there is a more elegant way. Even if you turn that lambda 
into a function the output type decorator and actual return value will disagree:
   ```py
 def test_timestamped_value(self):
   @beam.typehints.with_input_types(int)
   @beam.typehints.with_output_types(int)
   def timestamped(e):
 return beam.window.TimestampedValue(e, 0)
   
   with TestPipeline() as p:
 pcoll = p | beam.Create([1, 2, 3]) | beam.Map(timestamped)
 self.assertEqual(int, pcoll.element_type)
   ```
   I prefer the above style to inlining `.with_output_types` so it's clear I'm 
not making an exception.
   
   This mismatch is normal in Beam: a DoFn.process()'s return type and type 
hint disagree as well. We should probably add functionality to support 
annotating the above function like this:
   ```py
   def timestamped(e: int) -> beam.window.TimestampedValue[int]:
 return beam.window.TimestampedValue(e, 0)
   ```
   And `pcoll.element_type` will be interpreted as `int`.
   Same for `WindowedValues`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #12070: Add some metrics for BigQueryStorageStreamReader

2020-06-24 Thread GitBox


chamikaramj commented on pull request #12070:
URL: https://github.com/apache/beam/pull/12070#issuecomment-649157020


   Please add a JIRA



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on a change in pull request #12070: Add some metrics for BigQueryStorageStreamReader

2020-06-24 Thread GitBox


chamikaramj commented on a change in pull request #12070:
URL: https://github.com/apache/beam/pull/12070#discussion_r445255937



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##
@@ -219,7 +232,15 @@ private synchronized boolean readNextRecord() throws 
IOException {
 }
 
 fractionConsumedFromPreviousResponse = 
fractionConsumedFromCurrentResponse;
-ReadRowsResponse currentResponse = responseIterator.next();
+ReadRowsResponse currentResponse;
+Stopwatch stopwatch = Stopwatch.createStarted();

Review comment:
   Do you think these operations can have a performance impact ? If so will 
it make sense to add an option to disable/enable these ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on a change in pull request #12060: [BEAM-10218] Add FnApiRunner to cross-language validate runner test

2020-06-24 Thread GitBox


chamikaramj commented on a change in pull request #12060:
URL: https://github.com/apache/beam/pull/12060#discussion_r445254132



##
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##
@@ -375,6 +375,8 @@ def get_all_side_inputs():
 for o in transform.outputs.values():
   if o in s.side_inputs():
 continue
+  if o in producing_stages_by_pcoll:

Review comment:
   Could you send the runner changes as separate PR(s) ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11850: [BEAM-1438] Allow 0 shards on WriteFiles streaming

2020-06-24 Thread GitBox


chamikaramj commented on pull request #11850:
URL: https://github.com/apache/beam/pull/11850#issuecomment-649153502


   Have were tried out Dataflow streaming pipelines to make sure that this 
works as expected ?
   Probably we should add an integration/performance test for this case as well 
given that probably many of our users will end up using this path instead of 
specifying a value.
   
   cc: @dpmills @slavachernyak @reuvenlax 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ajamato commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

2020-06-24 Thread GitBox


ajamato commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-649150134


   @pabloem @chamikaramj 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ajamato opened a new pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

2020-06-24 Thread GitBox


ajamato opened a new pull request #12084:
URL: https://github.com/apache/beam/pull/12084


   [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the 
Dataflow Job ID
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[GitHub] [beam] chamikaramj merged pull request #11834: [BEAM-10117] Correct erroneous Job Failed message

2020-06-24 Thread GitBox


chamikaramj merged pull request #11834:
URL: https://github.com/apache/beam/pull/11834


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #11834: [BEAM-10117] Correct erroneous Job Failed message

2020-06-24 Thread GitBox


chamikaramj commented on pull request #11834:
URL: https://github.com/apache/beam/pull/11834#issuecomment-649147313


   LGTM. Thanks.
   
   Sorry about the delay.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ajamato commented on pull request #12083: [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

2020-06-24 Thread GitBox


ajamato commented on pull request #12083:
URL: https://github.com/apache/beam/pull/12083#issuecomment-649146663


   R: @pabloem @chamikaramj 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ajamato opened a new pull request #12083: [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

2020-06-24 Thread GitBox


ajamato opened a new pull request #12083:
URL: https://github.com/apache/beam/pull/12083


   [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow 
Job ID
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[GitHub] [beam] chamikaramj commented on pull request #11453: Don't use protobuf toString for BQ storage API protos

2020-06-24 Thread GitBox


chamikaramj commented on pull request #11453:
URL: https://github.com/apache/beam/pull/11453#issuecomment-649146270


   Ping ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] apilloud commented on a change in pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL

2020-06-24 Thread GitBox


apilloud commented on a change in pull request #12054:
URL: https://github.com/apache/beam/pull/12054#discussion_r445244602



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java
##
@@ -27,4 +28,7 @@ private SqlTypes() {}
 
   /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATE type. */
   public static final LogicalType DATE = new Date();
+
+  /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL TIME type. */
+  public static final LogicalType TIME = new Time();

Review comment:
   This is the most important line of this CL for correctness, wanted to 
get attention on it early.
   
   ZetaSQL uses a very specific format for this, which roughly matches the 
struct in LocalTime: 
https://github.com/google/zetasql/blob/79adcd0fe227173e68ed7aa88f580a691ebe82c2/zetasql/public/civil_time.h#L85
   
   It appears the Long base type is an offset in Nanoseconds. Is that 
conversion lossless? Is it expensive?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robinyqiu commented on pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL

2020-06-24 Thread GitBox


robinyqiu commented on pull request #12054:
URL: https://github.com/apache/beam/pull/12054#issuecomment-649145158


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] udim commented on pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()

2020-06-24 Thread GitBox


udim commented on pull request #12009:
URL: https://github.com/apache/beam/pull/12009#issuecomment-649145031


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445242837



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -198,6 +213,154 @@
  *...
  * }
  *
+ * Read from Kafka as a {@link DoFn}
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the https://beam.apache.org/blog/splittable-do-fn/;>blog post and https://s.apache.org/beam-fn-api;>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadAll#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *   KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   {@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *   KafkaIO.Read#getValueDeserializerProvider()}.
+ *   {@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *   KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * 
+ *
+ * For example, to create a basic {@link ReadAll} transform:
+ *
+ * {@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *  .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *  .withKeyDeserializer(LongDeserializer.class).
+ *  .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *KafkaSourceDescription.of(
+ *  new TopicPartition("topic", 1),
+ *  null,
+ *  null,
+ *  ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ * .withKeyDeserializer(LongDeserializer.class).
+ * .withValueDeserializer(StringDeserializer.class));
+ *
+ * }
+ *
+ * Configurations of {@link ReadAll}
+ *
+ * Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * {@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * {@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * For example, to create a {@link ReadAll} with these configurations:
+ *
+ * {@code
+ * pipeline
+ * .apply(Create.of(
+ *KafkaSourceDescription.of(
+ *  new TopicPartition("topic", 1),
+ *  null,
+ *  null,
+ *  ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *  .withKeyDeserializer(LongDeserializer.class).
+ *  .withValueDeserializer(StringDeserializer.class)
+ *  .withProcessingTime()
+ *  .commitOffsets());
+ *
+ * }
+ *
+ * Read from {@link KafkaSourceDescription}
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The 
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} 
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link 
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is 

[GitHub] [beam] pabloem commented on pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs

2020-06-24 Thread GitBox


pabloem commented on pull request #12082:
URL: https://github.com/apache/beam/pull/12082#issuecomment-649140269


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445236767



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {

Review comment:
   It seems like `Descriptor` makes more sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445235681



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -1051,33 +1261,341 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 }
   }
 
-  

-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. 
Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more 
information on usage and
+   * configuration.
*/
-  private static Map updateKafkaProperties(
-  Map currentConfig,
-  Map ignoredProperties,
-  Map updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadAll
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = LoggerFactory.getLogger(ReadAll.class);
+
+abstract Map getConsumerConfig();
+
+@Nullable
+abstract Map getOffsetConsumerConfig();
+
+@Nullable
+abstract DeserializerProvider getKeyDeserializerProvider();
+
+@Nullable
+abstract DeserializerProvider getValueDeserializerProvider();
+
+@Nullable
+abstract Coder getKeyCoder();
+
+@Nullable
+abstract Coder getValueCoder();
+
+abstract SerializableFunction, Consumer>
+getConsumerFactoryFn();
+
+@Nullable
+abstract SerializableFunction, Instant> 
getExtractOutputTimestampFn();
+
+@Nullable
+abstract SerializableFunction>
+getCreateWatermarkEstimatorFn();
+
+abstract boolean isCommitOffsetEnabled();
+
+@Nullable
+abstract TimestampPolicyFactory getTimestampPolicyFactory();
+
+abstract ReadAll.Builder toBuilder();
+
+@AutoValue.Builder
+abstract static class Builder {
+  abstract ReadAll.Builder setConsumerConfig(Map 
config);
+
+  abstract ReadAll.Builder setOffsetConsumerConfig(
+  Map offsetConsumerConfig);
+
+  abstract ReadAll.Builder setConsumerFactoryFn(
+  SerializableFunction, Consumer> 
consumerFactoryFn);
+
+  abstract ReadAll.Builder setKeyDeserializerProvider(
+  DeserializerProvider deserializerProvider);
+
+  abstract ReadAll.Builder setValueDeserializerProvider(
+  DeserializerProvider deserializerProvider);
+
+  abstract ReadAll.Builder setKeyCoder(Coder keyCoder);
+
+  abstract ReadAll.Builder setValueCoder(Coder valueCoder);
+
+  abstract ReadAll.Builder setExtractOutputTimestampFn(
+  SerializableFunction, Instant> fn);
+
+  abstract ReadAll.Builder setCreateWatermarkEstimatorFn(
+  SerializableFunction> fn);
+
+  abstract ReadAll.Builder setCommitOffsetEnabled(boolean 
commitOffsetEnabled);
+
+  abstract ReadAll.Builder 
setTimestampPolicyFactory(TimestampPolicyFactory policy);
+
+  abstract ReadAll build();
+}
 
-for (String key : updates.keySet()) {
+public static  ReadAll read() {
+  return new AutoValue_KafkaIO_ReadAll.Builder()
+  .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+  .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+  .setCommitOffsetEnabled(false)
+  .build()
+  .withProcessingTime()
+  .withMonotonicallyIncreasingWatermarkEstimator();
+}
+
+// Note that if the bootstrapServers is set here but also populated with 
the element, the
+// element
+// will override the bootstrapServers from the config.
+public ReadAll withBootstrapServers(String bootstrapServers) {
+  return withConsumerConfigUpdates(
+  ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers));
+}
+
+public ReadAll withKeyDeserializerProvider(DeserializerProvider 
deserializerProvider) {
+  return 
toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
+}
+
+public ReadAll withValueDeserializerProvider(
+DeserializerProvider deserializerProvider) {
+  return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
+}
+
+public ReadAll withKeyDeserializer(Class> 
keyDeserializer) {
+  return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
+}
+
+public ReadAll withValueDeserializer(Class> valueDeserializer) {
+  return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
+}
+
+public ReadAll withKeyDeserializerAndCoder(
+Class> keyDeserializer, Coder keyCoder) {
+  return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
+}
+
+public ReadAll withValueDeserializerAndCoder(
+Class> valueDeserializer, Coder 
valueCoder) {
+  return 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445226123



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {
+  @SchemaFieldName("topic")
+  abstract String getTopic();
+
+  @SchemaFieldName("partition")
+  abstract Integer getPartition();
+
+  @SchemaFieldName("start_read_offset")
+  @Nullable
+  abstract Long getStartReadOffset();
+
+  @SchemaFieldName("start_read_time")
+  @Nullable
+  abstract Instant getStartReadTime();
+
+  @SchemaFieldName("bootstrapServers")
+  @Nullable
+  abstract List getBootStrapServers();
+
+  private TopicPartition topicPartition = null;
+
+  public TopicPartition getTopicPartition() {

Review comment:
   Thanks! 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445225947



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -906,19 +1082,91 @@ public void setValueDeserializer(String 
valueDeserializer) {
   Coder keyCoder = getKeyCoder(coderRegistry);
   Coder valueCoder = getValueCoder(coderRegistry);
 
-  // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
-  Unbounded> unbounded =
-  org.apache.beam.sdk.io.Read.from(
-  
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+  // The Read will be expanded into SDF transform when "beam_fn_api" is 
enabled and
+  // "beam_fn_api_use_deprecated_read" is not enabled.
+  if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")
+  || ExperimentalOptions.hasExperiment(
+  input.getPipeline().getOptions(), 
"beam_fn_api_use_deprecated_read")) {
+// Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
+Unbounded> unbounded =
+org.apache.beam.sdk.io.Read.from(
+
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+
+PTransform>> transform = 
unbounded;
+
+if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
+  transform =
+  
unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
+}
 
-  PTransform>> transform = unbounded;
+return input.getPipeline().apply(transform);
+  }
+  ReadAll readTransform =
+  ReadAll.read()
+  .withConsumerConfigOverrides(getConsumerConfig())
+  .withOffsetConsumerConfigOverrides(getOffsetConsumerConfig())
+  .withConsumerFactoryFn(getConsumerFactoryFn())
+  .withKeyDeserializerProvider(getKeyDeserializerProvider())
+  .withValueDeserializerProvider(getValueDeserializerProvider())
+  .withManualWatermarkEstimator()
+  .withTimestampPolicyFactory(getTimestampPolicyFactory());
+  if (isCommitOffsetsInFinalizeEnabled()) {
+readTransform = readTransform.commitOffsets();
+  }
+  PCollection output =
+  input
+  .getPipeline()
+  .apply(Impulse.create())
+  .apply(ParDo.of(new GenerateKafkaSourceDescription(this)));
+  try {
+
output.setCoder(KafkaSourceDescription.getCoder(input.getPipeline().getSchemaRegistry()));

Review comment:
   It works with `setSchema` but I want to make it explicitly because it's 
possible that an user writes a DoFn which produces `KafkaSourceDescription`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on pull request #12020: [BEAM-9066] Add javax.annotation-api dependency.

2020-06-24 Thread GitBox


tysonjh commented on pull request #12020:
URL: https://github.com/apache/beam/pull/12020#issuecomment-649124368


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] tysonjh commented on pull request #12020: [BEAM-9066] Add javax.annotation-api dependency.

2020-06-24 Thread GitBox


tysonjh commented on pull request #12020:
URL: https://github.com/apache/beam/pull/12020#issuecomment-649121635


   > Ah, I'm wrong. [CDDL is Category 
B](https://www.apache.org/legal/resolved.html#category-b)
   
   Should I update the comments to be more clear? Perhaps having it mention GPL 
2.0 and the prohibited link is confusing and I should just reference CDDL?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] youngoli merged pull request #12081: [BEAM-9935] Counting empty split points as "any" in Go DataSource.

2020-06-24 Thread GitBox


youngoli merged pull request #12081:
URL: https://github.com/apache/beam/pull/12081


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] allenpradeep edited a comment on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

2020-06-24 Thread GitBox


allenpradeep edited a comment on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-649119157


   Can we merge this PR? I would want to send out a PR to count bytes written 
to spanner and that would be dependent on this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] allenpradeep commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'

2020-06-24 Thread GitBox


allenpradeep commented on pull request #11570:
URL: https://github.com/apache/beam/pull/11570#issuecomment-649119157


   Can we merge this patch? I have a patch to count bytes written to spanner 
which would be dependent on this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #12020: [BEAM-9066] Add javax.annotation-api dependency.

2020-06-24 Thread GitBox


kennknowles commented on pull request #12020:
URL: https://github.com/apache/beam/pull/12020#issuecomment-649111027


   Ah, I'm wrong. [CDDL is Category 
B](https://www.apache.org/legal/resolved.html#category-b)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #12020: [BEAM-9066] Add javax.annotation-api dependency.

2020-06-24 Thread GitBox


kennknowles commented on pull request #12020:
URL: https://github.com/apache/beam/pull/12020#issuecomment-649110871


   Neither CDDL nor GPL 2.0 would be allowable.
   
   But I believe [this library's 
license](https://github.com/javaee/javax.annotation/blob/83417807ad402ee1022c0307208d4510c80c68b6/LICENSE#L743)
 is [GPL 2.0 with the classpath 
exception](https://openjdk.java.net/legal/gplv2+ce.html), which is noted as an 
exception under the [Category X 
licenses](https://www.apache.org/legal/resolved.html#category-x).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on pull request #11406: [BEAM-9748] Refactor Reparallelize as an alternative Reshuffle implementation

2020-06-24 Thread GitBox


lukecwik commented on pull request #11406:
URL: https://github.com/apache/beam/pull/11406#issuecomment-649110837


   ping?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #11886: [BEAM-8647] delete .mailmap

2020-06-24 Thread GitBox


kennknowles commented on pull request #11886:
URL: https://github.com/apache/beam/pull/11886#issuecomment-649108755


   I see now that the thread link that Ismaël provided is the root discussion 
that we should follow to resolve this. I want to emphasize that also individual 
requests to remove information from this file I would of course support. Anyone 
removing their information should be warned that it does not protect the 
information or make it less public.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robinyqiu commented on pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL

2020-06-24 Thread GitBox


robinyqiu commented on pull request #12054:
URL: https://github.com/apache/beam/pull/12054#issuecomment-649107647


   Oops, 
`BeamSqlDateFunctionsIntegrationTest.testDateTimeFunctions_currentTime` seems 
to be failing. Is it failing on your machine as well? That could be a flaky 
test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

2020-06-24 Thread GitBox


lukecwik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r445207524



##
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##
@@ -739,7 +737,9 @@ private static ParDoPayload 
getParDoPayload(RunnerApi.PTransform parDoPTransform
 
   public static boolean usesStateOrTimers(AppliedPTransform 
transform) throws IOException {
 ParDoPayload payload = getParDoPayload(transform);
-return payload.getStateSpecsCount() > 0 || 
payload.getTimerFamilySpecsCount() > 0;
+return payload.getStateSpecsCount() > 0
+|| payload.getTimerFamilySpecsCount() > 0
+|| payload.getRequiresTimeSortedInput();

Review comment:
   I think modelling how something is executed and making that separate 
from what the users definition of their transform is important (so using the 
PTransformMatcher makes a lot of sense since not all runners will use state). 
For example, a runner can sort using their shuffle implementation where the 
timestamp is the sort key (this is something that Dataflow does for some batch 
pipelines) and other runners may choose to do this as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

2020-06-24 Thread GitBox


lukecwik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r445207524



##
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##
@@ -739,7 +737,9 @@ private static ParDoPayload 
getParDoPayload(RunnerApi.PTransform parDoPTransform
 
   public static boolean usesStateOrTimers(AppliedPTransform 
transform) throws IOException {
 ParDoPayload payload = getParDoPayload(transform);
-return payload.getStateSpecsCount() > 0 || 
payload.getTimerFamilySpecsCount() > 0;
+return payload.getStateSpecsCount() > 0
+|| payload.getTimerFamilySpecsCount() > 0
+|| payload.getRequiresTimeSortedInput();

Review comment:
   I think modelling how something is executed using the PTransformMatcher 
makes a lot of sense since not all runners will use state. For example, a 
runner can sort using their shuffle implementation where the timestamp is the 
sort key (this is something that Dataflow does for some batch pipelines) and 
other runners may choose to do this as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robinyqiu commented on a change in pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL

2020-06-24 Thread GitBox


robinyqiu commented on a change in pull request #12054:
URL: https://github.com/apache/beam/pull/12054#discussion_r444548460



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Time.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.logicaltypes;
+
+import java.time.LocalTime;
+import org.apache.beam.sdk.schemas.Schema;
+
+public class Time implements Schema.LogicalType {

Review comment:
   Please add class-level javadoc for this (see `DATE` for example.)

##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##
@@ -315,7 +317,7 @@ private static Expression castOutput(Expression value, 
FieldType toType) {
   private static Expression castOutputTime(Expression value, FieldType toType) 
{
 Expression valueDateTime = value;
 
-// First, convert to millis (except for DATE type)
+// First, convert to millis (except for DATE/TIME type)

Review comment:
   Seems we can combine the first and second step now (and update the 
comments). The code will look much simpler that way.

##
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##
@@ -2457,6 +2458,256 @@ public void testDateFromUnixInt64() {
 
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  /
+  // TIME type tests
+  /
+
+  @Test
+  public void testTimeLiteral() {
+String sql = "SELECT TIME '15:30:00'";
+
+ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+PAssert.that(stream)
+.containsInAnyOrder(
+Row.withSchema(Schema.builder().addLogicalTypeField("f_time", 
SqlTypes.TIME).build())
+.addValues(LocalTime.of(15, 30, 0, 0))
+.build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testTimeColumn() {
+String sql = "SELECT FORMAT_TIME('%T', time_field) FROM table_with_time";
+
+ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+PAssert.that(stream)
+.containsInAnyOrder(
+
Row.withSchema(Schema.builder().addStringField("f_time_str").build())
+.addValues("15:30:00")
+.build(),
+
Row.withSchema(Schema.builder().addStringField("f_time_str").build())
+.addValues("23:35:59")
+.build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  // TODO[BEAM-9166]: Add a test for CURRENT_TIME function ("SELECT 
CURRENT_TIME()")
+
+  @Test
+  public void testExtractTime() {
+String sql =
+"SELECT "
++ "EXTRACT(HOUR FROM TIME '15:30:35') as hour, "
++ "EXTRACT(MINUTE FROM TIME '15:30:35') as minute, "
++ "EXTRACT(SECOND FROM TIME '15:30:35') as second, "
++ "EXTRACT(MILLISECOND FROM TIME '15:30:35') as millisecond, "
++ "EXTRACT(MICROSECOND FROM TIME '15:30:35') as microsecond ";
+
+ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+final Schema schema =
+Schema.builder()
+.addField("hour", FieldType.INT64)
+.addField("minute", FieldType.INT64)
+.addField("second", FieldType.INT64)
+.addField("millisecond", FieldType.INT64)
+ 

[GitHub] [beam] kennknowles edited a comment on pull request #11886: [BEAM-8647] delete .mailmap

2020-06-24 Thread GitBox


kennknowles edited a comment on pull request #11886:
URL: https://github.com/apache/beam/pull/11886#issuecomment-649105139


   FWIW I am not sure everyone on this PR is familiar with the file. It is a 
config file to help git pretty print the log.
   
   See the "List Of Contributors" on https://beam.apache.org/blog/beam-2.22.0/. 
Without this file, the list will have duplicate entries like `Kenneth Knowles 
` and `Kenn Knowles ` and possibly some that 
are just `k...@google.com` etc. The file is a hint to git to de-dupe these when 
printing the log.
   
   If anything, using GitHub APIs is worse, because it associates actual 
registered accounts that are connected to other information. All this file does 
is associate strings that already exist in the log. But, as noted, that 
information is also all public.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns

2020-06-24 Thread GitBox


lukecwik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r445205831



##
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##
@@ -2572,6 +2572,49 @@ public void testTwoRequiresTimeSortedInputWithLateData() 
{
   false);
 }
 
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,
+  UsesRequiresTimeSortedInput.class,
+  UsesStrictTimerOrdering.class,
+  UsesTestStream.class
+})
+public void testRequiresTimeSortedInputWithStatelessDoFn() {
+  // generate list long enough to rule out random shuffle in sorted order
+  int numElements = 1000;
+  List eventStamps =
+  LongStream.range(0, numElements)
+  .mapToObj(i -> numElements - i)
+  .collect(Collectors.toList());
+  TestStream.Builder stream = TestStream.create(VarLongCoder.of());
+  for (Long stamp : eventStamps) {
+stream = stream.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+  }
+  testTimeSortedInputStateless(
+  numElements, pipeline.apply(stream.advanceWatermarkToInfinity()));
+}
+
+@Test
+@Category({
+  ValidatesRunner.class,
+  UsesStatefulParDo.class,

Review comment:
   makes sense





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #11886: [BEAM-8647] delete .mailmap

2020-06-24 Thread GitBox


kennknowles commented on pull request #11886:
URL: https://github.com/apache/beam/pull/11886#issuecomment-649105139


   FWIW I am not sure everyone on this PR is familiar with the file. It is a 
config file to help git pretty print the log.
   
   See the "List Of Contributors" on https://beam.apache.org/blog/beam-2.22.0/. 
Without this file, the list will have duplicate entries like `Kenneth Knowles 
` and `Kenn Knowles ` and possibly some that 
are just `k...@google.com` etc. The file is a hint to git to de-dupe these when 
printing the log.
   
   If anything, using GitHub APIs is worse, because it associates actual 
registered accounts that are connected to other information. All this file does 
is associate strings that already exist in the log.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj merged pull request #11911: [BEAM-10186] Sends an empty response to the runner instead of failing

2020-06-24 Thread GitBox


chamikaramj merged pull request #11911:
URL: https://github.com/apache/beam/pull/11911


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] kennknowles commented on pull request #11886: [BEAM-8647] delete .mailmap

2020-06-24 Thread GitBox


kennknowles commented on pull request #11886:
URL: https://github.com/apache/beam/pull/11886#issuecomment-649102442


   This file is very useful for producing the release notes and integrating 
with various git stats tools. It is just a reformatting of information 
available in the git log. I will read the linked thread.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] youngoli commented on pull request #12081: [BEAM-9935] Counting empty split points as "any" in Go DataSource.

2020-06-24 Thread GitBox


youngoli commented on pull request #12081:
URL: https://github.com/apache/beam/pull/12081#issuecomment-649098051


   I forgot that you could call len on nil slices, that's much better. Done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on pull request #12051: [BEAM-10298] beam-linkage-check.sh should not swallow errors

2020-06-24 Thread GitBox


lukecwik commented on pull request #12051:
URL: https://github.com/apache/beam/pull/12051#issuecomment-649095296


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ZijieSong946 commented on pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL

2020-06-24 Thread GitBox


ZijieSong946 commented on pull request #12054:
URL: https://github.com/apache/beam/pull/12054#issuecomment-649093285


   Bug fixed. Thanks a lot.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem opened a new pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs

2020-06-24 Thread GitBox


pabloem opened a new pull request #12082:
URL: https://github.com/apache/beam/pull/12082


   **Please** add a meaningful description for your change here
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[GitHub] [beam] youngoli commented on pull request #12081: [BEAM-9935] Counting empty split points as "any" in Go DataSource.

2020-06-24 Thread GitBox


youngoli commented on pull request #12081:
URL: https://github.com/apache/beam/pull/12081#issuecomment-649086523


   R: @lostluck CC: @lukecwik 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] youngoli opened a new pull request #12081: [BEAM-9935] Counting empty split points as "any" in Go DataSource.

2020-06-24 Thread GitBox


youngoli opened a new pull request #12081:
URL: https://github.com/apache/beam/pull/12081


   Fix a small oversight in the split code. I check for nil to mean any splits 
are ok, but if the list is empty instead of nil it should have the same 
behavior.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [x] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [x] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 

[GitHub] [beam] ibzib merged pull request #12006: [BEAM-10257] Add option defaults for Spark Python tests

2020-06-24 Thread GitBox


ibzib merged pull request #12006:
URL: https://github.com/apache/beam/pull/12006


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


TheNeuralBit commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445181264



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {
+  @SchemaFieldName("topic")
+  abstract String getTopic();
+
+  @SchemaFieldName("partition")
+  abstract Integer getPartition();
+
+  @SchemaFieldName("start_read_offset")
+  @Nullable
+  abstract Long getStartReadOffset();
+
+  @SchemaFieldName("start_read_time")
+  @Nullable
+  abstract Instant getStartReadTime();
+
+  @SchemaFieldName("bootstrapServers")
+  @Nullable
+  abstract List getBootStrapServers();
+
+  private TopicPartition topicPartition = null;
+
+  public TopicPartition getTopicPartition() {

Review comment:
   This will get pulled into the generated schema which I don't think is 
your intention. You should change the name so it's not a getter, or add 
`@SchemaIgnore`

##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {
+  @SchemaFieldName("topic")
+  abstract 

[GitHub] [beam] annaqin418 commented on pull request #12006: [BEAM-10257] Add option defaults for Spark Python tests

2020-06-24 Thread GitBox


annaqin418 commented on pull request #12006:
URL: https://github.com/apache/beam/pull/12006#issuecomment-649070534


   LGTM!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #12079:
URL: https://github.com/apache/beam/pull/12079#discussion_r445169828



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##
@@ -383,4 +392,30 @@ public Long extractOutput(Long accum) {
   return accum;
 }
   }
+
+  static class BitAnd extends CombineFn {
+@Override
+public Long createAccumulator() {
+  return -1L;

Review comment:
   O for that pipeline run and then wait. Thanks that's a nice catch. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] Imfuyuwei commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL

2020-06-24 Thread GitBox


Imfuyuwei commented on a change in pull request #12079:
URL: https://github.com/apache/beam/pull/12079#discussion_r445167391



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##
@@ -383,4 +392,30 @@ public Long extractOutput(Long accum) {
   return accum;
 }
   }
+
+  static class BitAnd extends CombineFn {
+@Override
+public Long createAccumulator() {
+  return -1L;

Review comment:
   Thanks!
   
   BTW I want to mention that I also added 1 line at the end of the previous 
testBitOrFunction(). I noticed that without this line, the previous bit_or test 
would always pass no matter what expected result I set, which made it an 
invalid test. 
   
   It will be good if you can take a look.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] Imfuyuwei commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL

2020-06-24 Thread GitBox


Imfuyuwei commented on a change in pull request #12079:
URL: https://github.com/apache/beam/pull/12079#discussion_r445167391



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##
@@ -383,4 +392,30 @@ public Long extractOutput(Long accum) {
   return accum;
 }
   }
+
+  static class BitAnd extends CombineFn {
+@Override
+public Long createAccumulator() {
+  return -1L;

Review comment:
   Thanks!
   
   BTW I want to mention that I also added 1 line at the end of the previous 
testBitOrFunction(). I noticed that without this line, the previous bit_or test 
will always no matter what expected result I set, which makes it an invalid 
test. 
   
   It will be good if you can take a look.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #12006: [BEAM-10257] Add option defaults for Spark Python tests

2020-06-24 Thread GitBox


ibzib commented on pull request #12006:
URL: https://github.com/apache/beam/pull/12006#issuecomment-649064265


   Run Python Spark ValidatesRunner



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia merged pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL

2020-06-24 Thread GitBox


amaliujia merged pull request #12079:
URL: https://github.com/apache/beam/pull/12079


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #12079:
URL: https://github.com/apache/beam/pull/12079#discussion_r445163189



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##
@@ -383,4 +392,30 @@ public Long extractOutput(Long accum) {
   return accum;
 }
   }
+
+  static class BitAnd extends CombineFn {
+@Override
+public Long createAccumulator() {
+  return -1L;

Review comment:
   That's a good point   I should go back to re-visit CMU 15213 course 
slides.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #12079:
URL: https://github.com/apache/beam/pull/12079#discussion_r445163189



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##
@@ -383,4 +392,30 @@ public Long extractOutput(Long accum) {
   return accum;
 }
   }
+
+  static class BitAnd extends CombineFn {
+@Override
+public Long createAccumulator() {
+  return -1L;

Review comment:
   That's a good point   I should go back to re-visit CMU 15231 course 
slides.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #12073:
URL: https://github.com/apache/beam/pull/12073#discussion_r445150359



##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRelTest.java
##
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.apache.beam.sdk.schemas.Schema;
+
+public class BeamMatchRelTest {
+
+  public static final TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void MatchLogicalPlanTest() {
+Schema schemaType = Schema.builder()
+.addInt32Field("id")
+.addStringField("name")
+.addInt32Field("proctime")
+.build();
+
+PCollection input =
+pipeline.apply(
+Create.of(
+
Row.withSchema(schemaType).addValue(1).addValue("a").addValue(1).build())
+.withRowSchema(schemaType));
+
+String sql = "SELECT T.aid, T.bid, T.cid " +
+"FROM PCOLLECTION " +
+"MATCH_RECOGNIZE (" +
+"PARTITION BY id " +
+"ORDER BY proctime " +
+"MEASURES " +
+"A.id AS aid, " +
+"B.id AS bid, " +
+"C.id AS cid " +
+"PATTERN (A B C) " +
+"DEFINE " +
+"A AS name = 'a', " +
+"B AS name = 'b', " +
+"C AS name = 'c' " +
+") AS T";
+
+PCollection result = input.apply(SqlTransform.query(sql));

Review comment:
   I see. Yes I think this test is valid. It tests if the query can be 
compiled, which includes the BeamMatchRel.

##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRelTest.java
##
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.apache.beam.sdk.schemas.Schema;
+
+public class BeamMatchRelTest {
+
+  public static final TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void MatchLogicalPlanTest() {
+Schema schemaType = Schema.builder()
+.addInt32Field("id")
+.addStringField("name")
+.addInt32Field("proctime")
+.build();
+
+PCollection input =
+pipeline.apply(
+Create.of(
+
Row.withSchema(schemaType).addValue(1).addValue("a").addValue(1).build())
+.withRowSchema(schemaType));
+
+String sql = "SELECT T.aid, T.bid, T.cid " +
+"FROM PCOLLECTION " +
+"MATCH_RECOGNIZE (" +
+"PARTITION BY id " +
+"ORDER BY proctime " +
+"MEASURES " +
+"A.id AS aid, " +
+"B.id AS bid, " +
+"C.id AS cid " +
+"PATTERN (A B C) " +
+"DEFINE " +
+"A AS name = 'a', " +
+"B AS name = 'b', " +
+"C AS name = 'c' " +
+") AS T";
+
+PCollection result = input.apply(SqlTransform.query(sql));

Review comment:
   I see. Yes I think this test is valid. It tests if the query can be 
compiled, which covers the BeamMatchRel.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


lukecwik commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445087927



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.

Review comment:
   ```suggestion
* Represents a Kafka source description.
*
* Note that this object should be encoded/decoded with its corresponding 
{@link #getCoder schema coder}.
   ```

##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -198,6 +213,154 @@
  *...
  * }
  *
+ * Read from Kafka as a {@link DoFn}
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the https://beam.apache.org/blog/splittable-do-fn/;>blog post and https://s.apache.org/beam-fn-api;>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadAll#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *   KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   {@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *   KafkaIO.Read#getValueDeserializerProvider()}.
+ *   {@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *   KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * 
+ *
+ * For example, to create a basic {@link ReadAll} transform:
+ *
+ * {@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *  .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *  .withKeyDeserializer(LongDeserializer.class).
+ *  .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline

Review comment:
   ```suggestion
* {@code
   

[GitHub] [beam] Imfuyuwei commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL

2020-06-24 Thread GitBox


Imfuyuwei commented on a change in pull request #12079:
URL: https://github.com/apache/beam/pull/12079#discussion_r445160365



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##
@@ -383,4 +392,30 @@ public Long extractOutput(Long accum) {
   return accum;
 }
   }
+
+  static class BitAnd extends CombineFn {
+@Override
+public Long createAccumulator() {
+  return -1L;

Review comment:
   Because -1L is represented as 64 bits of 1 in binary while 1L only has 
one 1 at the least significant bit. 
   
   In order to do bit_and operation, I think the initial bit mask should 
consists of only 1s, so I use -1L.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on pull request #11749:
URL: https://github.com/apache/beam/pull/11749#issuecomment-649059330


   Run Python2_PVR_Flink PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on a change in pull request #11911: [BEAM-10186] Sends an empty response to the runner instead of failing

2020-06-24 Thread GitBox


chamikaramj commented on a change in pull request #11911:
URL: https://github.com/apache/beam/pull/11911#discussion_r445156490



##
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##
@@ -260,6 +260,46 @@ BundleProcessor get(
 }
   }
 
+  @Test
+  public void testTrySplitBeforeBundleDoesNotFail() {
+ProcessBundleHandler handler =
+new ProcessBundleHandler(
+PipelineOptionsFactory.create(),
+null,
+beamFnDataClient,
+null /* beamFnStateClient */,
+null /* finalizeBundleHandler */,
+ImmutableMap.of(),
+new BundleProcessorCache());
+
+handler.trySplit(

Review comment:
   Done.

##
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
##
@@ -260,6 +260,46 @@ BundleProcessor get(
 }
   }
 
+  @Test
+  public void testTrySplitBeforeBundleDoesNotFail() {
+ProcessBundleHandler handler =
+new ProcessBundleHandler(
+PipelineOptionsFactory.create(),
+null,
+beamFnDataClient,
+null /* beamFnStateClient */,
+null /* finalizeBundleHandler */,
+ImmutableMap.of(),
+new BundleProcessorCache());
+
+handler.trySplit(
+BeamFnApi.InstructionRequest.newBuilder()
+.setInstructionId("999L")
+.setProcessBundleSplit(
+
BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId("unknown-id"))
+.build());
+  }
+
+  @Test
+  public void testProgressBeforeBundleDoesNotFail() throws Exception {
+ProcessBundleHandler handler =
+new ProcessBundleHandler(
+PipelineOptionsFactory.create(),
+null,
+beamFnDataClient,
+null /* beamFnStateClient */,
+null /* finalizeBundleHandler */,
+ImmutableMap.of(),
+new BundleProcessorCache());
+
+handler.progress(

Review comment:
   Done.

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
##
@@ -341,53 +341,52 @@ private void 
createRunnerAndConsumersForPTransformRecursively(
   throws Exception {
 BundleProcessor bundleProcessor =
 
bundleProcessorCache.find(request.getProcessBundleProgress().getInstructionId());
-if (bundleProcessor == null) {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #12079:
URL: https://github.com/apache/beam/pull/12079#discussion_r445151779



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##
@@ -383,4 +392,30 @@ public Long extractOutput(Long accum) {
   return accum;
 }
   }
+
+  static class BitAnd extends CombineFn {
+@Override
+public Long createAccumulator() {
+  return -1L;

Review comment:
   -1L makes it a bit harder to read. Why not use 1L instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #12073:
URL: https://github.com/apache/beam/pull/12073#discussion_r445150359



##
File path: 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRelTest.java
##
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.apache.beam.sdk.schemas.Schema;
+
+public class BeamMatchRelTest {
+
+  public static final TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void MatchLogicalPlanTest() {
+Schema schemaType = Schema.builder()
+.addInt32Field("id")
+.addStringField("name")
+.addInt32Field("proctime")
+.build();
+
+PCollection input =
+pipeline.apply(
+Create.of(
+
Row.withSchema(schemaType).addValue(1).addValue("a").addValue(1).build())
+.withRowSchema(schemaType));
+
+String sql = "SELECT T.aid, T.bid, T.cid " +
+"FROM PCOLLECTION " +
+"MATCH_RECOGNIZE (" +
+"PARTITION BY id " +
+"ORDER BY proctime " +
+"MEASURES " +
+"A.id AS aid, " +
+"B.id AS bid, " +
+"C.id AS cid " +
+"PATTERN (A B C) " +
+"DEFINE " +
+"A AS name = 'a', " +
+"B AS name = 'b', " +
+"C AS name = 'c' " +
+") AS T";
+
+PCollection result = input.apply(SqlTransform.query(sql));

Review comment:
   I see. Yes I think this test is valid. It tests if the query can be 
compile, which includes the BeamMatchRel.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #12073:
URL: https://github.com/apache/beam/pull/12073#discussion_r445149049



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRel.java
##
@@ -0,0 +1,279 @@
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.impl.SqlConversionException;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollation;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Match;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexVariable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Array;
+import java.util.*;
+
+import static 
org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
+
+/** {@link BeamRelNode} to replace a {@link Match} node. */
+public class BeamMatchRel extends Match implements BeamRelNode {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BeamMatchRel.class);
+
+public BeamMatchRel(
+RelOptCluster cluster,
+RelTraitSet traitSet,
+RelNode input,
+RelDataType rowType,
+RexNode pattern,
+boolean strictStart,
+boolean strictEnd,
+Map patternDefinitions,
+Map measures,
+RexNode after,
+Map> subsets,
+boolean allRows,
+List partitionKeys,
+RelCollation orderKeys,
+RexNode interval) {
+
+super(cluster,
+traitSet,
+input,
+rowType,
+pattern,
+strictStart,
+strictEnd,
+patternDefinitions,
+measures,
+after,
+subsets,
+allRows,
+partitionKeys,
+orderKeys,
+interval);
+
+}
+
+@Override
+public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+return BeamCostModel.FACTORY.makeTinyCost(); // return constant 
costModel for now
+}
+
+@Override
+public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+// a simple way of getting some estimate data
+// to be examined further
+NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(input, mq);
+double numRows = inputEstimate.getRowCount();
+double winSize = inputEstimate.getWindow();
+double rate = inputEstimate.getRate();
+
+return NodeStats.create(numRows, rate, winSize).multiply(0.5);
+}
+
+@Override
+public PTransform, PCollection> 
buildPTransform() {
+
+return new matchTransform(partitionKeys, orderKeys);
+}
+
+private static class matchTransform extends 
PTransform, PCollection> {
+
+private final List parKeys;
+private final RelCollation orderKeys;
+
+public matchTransform(List parKeys, RelCollation orderKeys) {
+this.parKeys = parKeys;
+this.orderKeys = orderKeys;
+}
+
+@Override
+public PCollection expand(PCollectionList pinput) {
+checkArgument(
+pinput.size() == 1,
+"Wrong number of inputs for %s: %s",
+BeamMatchRel.class.getSimpleName(),
+pinput);
+PCollection upstream = pinput.get(0);
+
+Schema collectionSchema = upstream.getSchema();
+
+Schema.Builder schemaBuilder = new Schema.Builder();
+for (RexNode i : parKeys) {
+RexVariable varNode = (RexVariable) i;
+int index = Integer.parseInt(varNode.getName().substring(1)); 
// get rid of `$`
+

[GitHub] [beam] mxm commented on a change in pull request #12062: [BEAM-10305] Let InMemoryBagUserStateFactory only use a single cache token

2020-06-24 Thread GitBox


mxm commented on a change in pull request #12062:
URL: https://github.com/apache/beam/pull/12062#discussion_r445147652



##
File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlersTest.java
##
@@ -59,4 +68,46 @@ public void testDelegatingStateHandlerThrowsWhenNotFound() 
throws Exception {
 StateRequestHandlers.delegateBasedUponType(new 
EnumMap<>(StateKey.TypeCase.class))
 .handle(StateRequest.getDefaultInstance());
   }
+
+  @Test
+  public void testUserStateCacheTokenGeneration() {
+ProcessBundleDescriptors.ExecutableProcessBundleDescriptor 
processBundleDescriptor =
+
Mockito.mock(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor.class);
+InMemoryBagUserStateFactory inMemoryBagUserStateFactory = new 
InMemoryBagUserStateFactory<>();
+StateRequestHandler stateRequestHandler =
+StateRequestHandlers.forBagUserStateHandlerFactory(
+processBundleDescriptor, inMemoryBagUserStateFactory);
+
+Iterable cacheTokens =
+stateRequestHandler.getCacheTokens();
+assertThat(Iterables.size(cacheTokens), is(1));
+
+BeamFnApi.ProcessBundleRequest.CacheToken cacheToken = 
Iterables.getOnlyElement(cacheTokens);
+assertThat(
+cacheToken.getUserState(),
+
is(BeamFnApi.ProcessBundleRequest.CacheToken.UserState.getDefaultInstance()));
+assertThat(cacheToken.getToken(), is(notNullValue()));
+
+inMemoryBagUserStateFactory.forUserState(

Review comment:
   Looks like this should be using the StateRequestHandler instead, will 
update.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #12073:
URL: https://github.com/apache/beam/pull/12073#discussion_r445147153



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRel.java
##
@@ -0,0 +1,279 @@
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.impl.SqlConversionException;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollation;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Match;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexVariable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Array;
+import java.util.*;

Review comment:
   Same. Import class by class.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on pull request #12062: [BEAM-10305] Let InMemoryBagUserStateFactory only use a single cache token

2020-06-24 Thread GitBox


mxm commented on pull request #12062:
URL: https://github.com/apache/beam/pull/12062#issuecomment-649046917


   Thanks for pointing out the duplication issue. The new version should 
properly fix it. Please have another look. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #12067: [BEAM-10308] Make component ID assignments consistent across PipelineContext instances

2020-06-24 Thread GitBox


TheNeuralBit commented on pull request #12067:
URL: https://github.com/apache/beam/pull/12067#issuecomment-649046980


   I suppose another place where global state is ill-advised is when running 
tests, since we run all the tests in the same process and many of them create 
pipelines. 
   
   Python precommits are failing because of this. It looks like the failing 
tests are making assertions about specific values of component keys which seems 
brittle, for example:
   
   ```
   self = 

   
   def test_cacheable_key_with_version_map(self):
 p = beam.Pipeline(interactive_runner.InteractiveRunner())
 # pylint: disable=range-builtin-not-iterating
 init_pcoll = p | 'Init Create' >> beam.Create(range(10))
   
 # It's normal that when executing, the pipeline object is a different
 # but equivalent instance from what user has built. The pipeline 
instrument
 # should be able to identify if the original instance has changed in an
 # interactive env while mutating the other instance for execution. The
 # version map can be used to figure out what the PCollection instances 
are
 # in the original instance and if the evaluation has changed since last
 # execution.
 p2 = beam.Pipeline(interactive_runner.InteractiveRunner())
 # pylint: disable=range-builtin-not-iterating
 init_pcoll_2 = p2 | 'Init Create' >> beam.Create(range(10))
 _, ctx = p2.to_runner_api(use_fake_coders=True, return_context=True)
   
 # The cacheable_key should use id(init_pcoll) as prefix even when
 # init_pcoll_2 is supplied as long as the version map is given.
 self.assertEqual(
 instr.cacheable_key(
 init_pcoll_2,
 instr.pcolls_to_pcoll_id(p2, ctx),
 {'ref_PCollection_PCollection_8': str(id(init_pcoll))}),
   > str(id(init_pcoll)) + '_ref_PCollection_PCollection_8')
   E AssertionError: '140176476148624_ref_PCollection_PCollection_4539' != 
'140176476499024_ref_PCollection_PCollection_8'
   E - 140176476148624_ref_PCollection_PCollection_4539
   E ?  - ^^   
   E + 140176476499024_ref_PCollection_PCollection_8
   E ?   ^^^  
   ```
   
   Its probably easier to do the work to make sure cached component ids are 
scoped to an individual pipeline rather than fixing all of these tests (and a 
global cache shared across tests will be problematic anyway).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib opened a new pull request #12080: Return throwing state handler for default API descriptor.

2020-06-24 Thread GitBox


ibzib opened a new pull request #12080:
URL: https://github.com/apache/beam/pull/12080


   Follow-up from https://github.com/apache/beam/pull/12040/files#r445063845
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[GitHub] [beam] ibzib merged pull request #12040: Revert "Fix state handler for missing service descriptor."

2020-06-24 Thread GitBox


ibzib merged pull request #12040:
URL: https://github.com/apache/beam/pull/12040


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mxm commented on a change in pull request #12062: [BEAM-10305] Let InMemoryBagUserStateFactory only use a single cache token

2020-06-24 Thread GitBox


mxm commented on a change in pull request #12062:
URL: https://github.com/apache/beam/pull/12062#discussion_r445131904



##
File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/InMemoryBagUserStateFactory.java
##
@@ -37,13 +37,19 @@
 /**
  * Holds user state in memory. Only one key is active at a time due to the 
GroupReduceFunction being
  * called once per key. Needs to be reset via {@code resetForNewKey()} before 
processing a new key.
+ *
+ * In case of any failures, this factory must be discarded. Otherwise, the 
contained state cache
+ * token would be reused which would corrupt the state cache.
  */
 public class InMemoryBagUserStateFactory
 implements StateRequestHandlers.BagUserStateHandlerFactory {
 
+  private final ByteString cacheToken;
+
   private List handlers;
 
   public InMemoryBagUserStateFactory() {
+cacheToken = 
ByteString.copyFrom(UUID.randomUUID().toString().getBytes(Charsets.UTF_8));

Review comment:
   I see, makes sense. Let me update the PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] JozoVilcek commented on pull request #12064: [BEAM-10284] Add option to pass configuration into ParquetIO.Sink

2020-06-24 Thread GitBox


JozoVilcek commented on pull request #12064:
URL: https://github.com/apache/beam/pull/12064#issuecomment-649031901


   Hi, yes, the PR is to control paqruet settings you mentioned. I was 
considering a Map based settings but since parquet writer is hadoop based 
anyway now, I thought there is not reason why not to expose full config. I do 
not see any reason why not to make it Map if this is something you prefer. Now 
I think it is probably a better choice



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on pull request #11738: [BEAM-9936] Create SDK harness containers with Python 3.8

2020-06-24 Thread GitBox


ibzib commented on pull request #11738:
URL: https://github.com/apache/beam/pull/11738#issuecomment-649032247


   I think this PR might have caused BEAM-10316, PTAL



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] Imfuyuwei opened a new pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL and adde…

2020-06-24 Thread GitBox


Imfuyuwei opened a new pull request #12079:
URL: https://github.com/apache/beam/pull/12079


   R: @amaliujia
   CC: @kennknowles
   
   Added support for BIT_AND aggregation function in Beam SQL.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [x] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[GitHub] [beam] epicfaace commented on pull request #11824: [BEAM-10101] Add HttpIO / HttpFileSystem (Python)

2020-06-24 Thread GitBox


epicfaace commented on pull request #11824:
URL: https://github.com/apache/beam/pull/11824#issuecomment-649021127


   @pabloem anything you need from me on this one?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib opened a new pull request #12078: [BEAM-10315] Fix gradlew clean.

2020-06-24 Thread GitBox


ibzib opened a new pull request #12078:
URL: https://github.com/apache/beam/pull/12078


   R: @kamilwu 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[GitHub] [beam] chamikaramj commented on pull request #12071: [BEAM-9932] Add documentation describing cross-language test pipelines

2020-06-24 Thread GitBox


chamikaramj commented on pull request #12071:
URL: https://github.com/apache/beam/pull/12071#issuecomment-649013366


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] TheNeuralBit commented on pull request #11744: [BEAM-10007] Handle ValueProvider pipeline options in PortableRunner

2020-06-24 Thread GitBox


TheNeuralBit commented on pull request #11744:
URL: https://github.com/apache/beam/pull/11744#issuecomment-649012660


   Run Python PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] allenpradeep commented on a change in pull request #12010: [BEAM-10259] Use ref-counted connection to Spanner to prevent multiple connections.

2020-06-24 Thread GitBox


allenpradeep commented on a change in pull request #12010:
URL: https://github.com/apache/beam/pull/12010#discussion_r445110010



##
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
##
@@ -31,33 +31,73 @@
 import io.grpc.ClientCall;
 import io.grpc.ClientInterceptor;
 import io.grpc.MethodDescriptor;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Manages lifecycle of {@link DatabaseClient} and {@link Spanner} instances. 
*/
 class SpannerAccessor implements AutoCloseable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SpannerAccessor.class);
+
   // A common user agent token that indicates that this request was originated 
from Apache Beam.
   private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";
 
+  // Only create one SpannerAccessor for each different SpannerConfig.
+  private static final ConcurrentHashMap 
spannerAccessors =
+  new ConcurrentHashMap<>();
+
+  // Keep reference counts of each SpannerAccessor's usage so that we can close
+  // it when it is no longer in use.
+  private static final ConcurrentHashMap 
refcounts =
+  new ConcurrentHashMap<>();
+
   private final Spanner spanner;
   private final DatabaseClient databaseClient;
   private final BatchClient batchClient;
   private final DatabaseAdminClient databaseAdminClient;
+  private final SpannerConfig spannerConfig;
 
   private SpannerAccessor(
   Spanner spanner,
   DatabaseClient databaseClient,
   DatabaseAdminClient databaseAdminClient,
-  BatchClient batchClient) {
+  BatchClient batchClient,
+  SpannerConfig spannerConfig) {
 this.spanner = spanner;
 this.databaseClient = databaseClient;
 this.databaseAdminClient = databaseAdminClient;
 this.batchClient = batchClient;
+this.spannerConfig = spannerConfig;
   }
 
-  static SpannerAccessor create(SpannerConfig spannerConfig) {

Review comment:
   NIT: Should we maintain this function for compatibility sake? There 
might be external customers who may be using create call in their template. 
Would they be affected when they start using this code.
   Internally, this can call getOrCreate()
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] chamikaramj commented on pull request #12010: [BEAM-10259] Use ref-counted connection to Spanner to prevent multiple connections.

2020-06-24 Thread GitBox


chamikaramj commented on pull request #12010:
URL: https://github.com/apache/beam/pull/12010#issuecomment-649008321


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r445107329



##
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import 
org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+  RelOptCluster cluster,
+  RelTraitSet traitSet,
+  RelNode input,
+  List constants,
+  RelDataType rowType,
+  List groups) {
+super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform, PCollection> buildPTransform() {
+Schema outputSchema = CalciteUtils.toSchema(getRowType());
+final List analyticFields = Lists.newArrayList();
+this.groups.stream()
+.forEach(
+anAnalyticGroup -> {
+  List partitionKeysDef = anAnalyticGroup.keys.toList();
+  List orderByKeys = Lists.newArrayList();
+  List orderByDirections = Lists.newArrayList();
+  List orderByNullDirections = Lists.newArrayList();
+  anAnalyticGroup.orderKeys.getFieldCollations().stream()
+  .forEach(
+  fc -> {
+orderByKeys.add(fc.getFieldIndex());
+orderByDirections.add(
+fc.direction == 
RelFieldCollation.Direction.ASCENDING);
+orderByNullDirections.add(
+fc.nullDirection == 
RelFieldCollation.NullDirection.FIRST);
+  });
+  int lowerB = Integer.MAX_VALUE; // Unbounded by default
+  int upperB = Integer.MAX_VALUE; // Unbounded by default
+  if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+lowerB = 0;
+  } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+// pending
+  } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+// pending
+  }
+  if (anAnalyticGroup.upperBound.isCurrentRow()) {
+upperB = 0;
+  } else if (anAnalyticGroup.upperBound.isPreceding()) {
+// pending
+  } else if (anAnalyticGroup.upperBound.isFollowing()) {
+// 

  1   2   >