[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=184094=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184094
 ]

ASF GitHub Bot logged work on BEAM-5918:


Author: ASF GitHub Bot
Created on: 11/Jan/19 04:42
Start Date: 11/Jan/19 04:42
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #7373: [BEAM-5918] Fix 
casting of non-numeric types
URL: https://github.com/apache/beam/pull/7373#issuecomment-453376610
 
 
   LGTM if my comment is wrong. Nice code & tests, again. There's some conflict 
to be resolved.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184094)
Time Spent: 9h 10m  (was: 9h)

> Add Cast transform for Rows
> ---
>
> Key: BEAM-5918
> URL: https://issues.apache.org/jira/browse/BEAM-5918
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Gleb Kanterov
>Assignee: Gleb Kanterov
>Priority: Major
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> There is a need for a generic transform that given two Row schemas will 
> convert rows between them. There must be a possibility to opt-out from 
> certain kind of conversions, for instance, converting ints to shorts can 
> cause overflow. Another example, a schema could have a nullable field, but 
> never have NULL value in practice, because it was filtered out.
> What is needed:
> - widening values (e.g., int -> long)
> - narrowwing (e.g., int -> short)
> - runtime check for overflow while narrowing
> - ignoring nullability (nullable=true -> nullable=false)
> - weakening nullability (nullable=false -> nullable=true)
> - projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5918) Add Cast transform for Rows

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=184093=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184093
 ]

ASF GitHub Bot logged work on BEAM-5918:


Author: ASF GitHub Bot
Created on: 11/Jan/19 04:41
Start Date: 11/Jan/19 04:41
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on pull request #7373: 
[BEAM-5918] Fix casting of non-numeric types
URL: https://github.com/apache/beam/pull/7373#discussion_r247002100
 
 

 ##
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CastTest.java
 ##
 @@ -36,137 +32,252 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 /** Tests for {@link Cast}. */
 public class CastTest {
 
   @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient ExpectedException expectedException = 
ExpectedException.none();
 
   @Test
   @Category(NeedsRunner.class)
-  public void testProjection() throws Exception {
-Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(Projection2.class);
-PCollection pojos =
+  public void testProjection() {
+Schema inputSchema =
+Schema.of(
+Schema.Field.of("f0", Schema.FieldType.INT16),
+Schema.Field.of("f1", Schema.FieldType.INT32),
+Schema.Field.of("f2", Schema.FieldType.STRING));
+
+// remove f0 and reorder f1 and f2
+Schema outputSchema =
+Schema.of(
+Schema.Field.of("f2", Schema.FieldType.STRING),
+Schema.Field.of("f1", Schema.FieldType.INT32));
+
+Row input = Row.withSchema(inputSchema).addValues((short) 1, 2, 
"3").build();
+Row expected = Row.withSchema(outputSchema).addValues("3", 2).build();
+
+PCollection output =
 pipeline
-.apply(Create.of(new Projection1()))
-.apply(Cast.widening(outputSchema))
-.apply(Convert.to(Projection2.class));
+.apply(Create.of(input).withRowSchema(inputSchema))
+.apply(Cast.widening(outputSchema));
+
+PAssert.that(output).containsInAnyOrder(expected);
 
-PAssert.that(pojos).containsInAnyOrder(new Projection2());
 pipeline.run();
   }
 
   @Test
   @Category(NeedsRunner.class)
-  public void testTypeWiden() throws Exception {
-Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(TypeWiden2.class);
+  public void testTypeWiden() {
+Schema inputSchema =
+Schema.of(
+Schema.Field.of("f0", Schema.FieldType.INT16),
+Schema.Field.of("f1", Schema.FieldType.INT32));
+
+Schema outputSchema =
+Schema.of(
+Schema.Field.of("f0", Schema.FieldType.INT32),
+Schema.Field.of("f1", Schema.FieldType.INT64));
 
-PCollection pojos =
+Row input = Row.withSchema(inputSchema).addValues((short) 1, 2).build();
+Row expected = Row.withSchema(outputSchema).addValues(1, 2L).build();
+
+PCollection output =
 pipeline
-.apply(Create.of(new TypeWiden1()))
-.apply(Cast.widening(outputSchema))
-.apply(Convert.to(TypeWiden2.class));
+.apply(Create.of(input).withRowSchema(inputSchema))
+.apply(Cast.widening(outputSchema));
+
+PAssert.that(output).containsInAnyOrder(expected);
 
-PAssert.that(pojos).containsInAnyOrder(new TypeWiden2());
 pipeline.run();
   }
 
   @Test
-  @Category(NeedsRunner.class)
-  public void testTypeNarrow() throws Exception {
-// narrowing is the opposite of widening
-Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(TypeWiden1.class);
+  public void testTypeWidenFail() {
+Schema inputSchema =
+Schema.of(
+Schema.Field.of("f0", Schema.FieldType.INT16),
+Schema.Field.of("f1", Schema.FieldType.INT64));
 
-PCollection pojos =
-pipeline
-.apply(Create.of(new TypeWiden2()))
-.apply(Cast.narrowing(outputSchema))
-.apply(Convert.to(TypeWiden1.class));
+Schema outputSchema =
+Schema.of(
+Schema.Field.of("f0", Schema.FieldType.INT32),
+Schema.Field.of("f1", Schema.FieldType.INT32));
 
-PAssert.that(pojos).containsInAnyOrder(new TypeWiden1());
-pipeline.run();
-  }
+expectedException.expect(IllegalArgumentException.class);
+expectedException.expectMessage(containsString("f1: Can't cast 'INT64' to 
'INT32'"));
 
-  @Test(expected = IllegalArgumentException.class)
-  @Category(NeedsRunner.class)
-  public void testTypeNarrowFail() throws Exception {
-// narrowing is the opposite of widening
-Schema inputSchema = 
pipeline.getSchemaRegistry().getSchema(TypeWiden2.class);
-Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(TypeWiden1.class);
-
-

[jira] [Work logged] (BEAM-5723) CassandraIO is broken because of use of bad relocation of guava

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5723?focusedWorklogId=184095=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184095
 ]

ASF GitHub Bot logged work on BEAM-5723:


Author: ASF GitHub Bot
Created on: 11/Jan/19 04:44
Start Date: 11/Jan/19 04:44
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on pull request #7237: 
[BEAM-5723] Changed shadow plugin configuration to avoid relocating g…
URL: https://github.com/apache/beam/pull/7237#discussion_r247002267
 
 

 ##
 File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
 ##
 @@ -45,6 +43,8 @@
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.BoundedSource;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 
 Review comment:
   I think the global search/replace is easy either way. I don't think the 
security issue matters for us, but we should just update anyhow.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184095)
Time Spent: 6.5h  (was: 6h 20m)

> CassandraIO is broken because of use of bad relocation of guava
> ---
>
> Key: BEAM-5723
> URL: https://issues.apache.org/jira/browse/BEAM-5723
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-cassandra
>Affects Versions: 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0
>Reporter: Arun sethia
>Assignee: João Cabrita
>Priority: Major
> Fix For: 2.7.1, 2.10.0
>
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> While using apache beam to run dataflow job to read data from BigQuery and 
> Store/Write to Cassandra with following libaries:
>  # beam-sdks-java-io-cassandra - 2.6.0
>  # beam-sdks-java-io-jdbc - 2.6.0
>  # beam-sdks-java-io-google-cloud-platform - 2.6.0
>  # beam-sdks-java-core - 2.6.0
>  # google-cloud-dataflow-java-sdk-all - 2.5.0
>  # google-api-client -1.25.0
>  
> I am getting following error at the time insert/save data to Cassandra.
> {code:java}
> [error] (run-main-0) org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.NoSuchMethodError: 
> com.datastax.driver.mapping.Mapper.saveAsync(Ljava/lang/Object;)Lorg/apache/beam/repackaged/beam_sdks_java_io_cassandra/com/google/common/util/concurrent/ListenableFuture;
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.NoSuchMethodError: 
> com.datastax.driver.mapping.Mapper.saveAsync(Ljava/lang/Object;)Lorg/apache/beam/repackaged/beam_sdks_java_io_cassandra/com/google/common/util/concurrent/ListenableFuture;
>  at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:332)
>  at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:302)
>  at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197)
>  at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
>  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6231) Triage test failures introduced by use_executable_stage_bundle_execution

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6231?focusedWorklogId=184080=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184080
 ]

ASF GitHub Bot logged work on BEAM-6231:


Author: ASF GitHub Bot
Created on: 11/Jan/19 03:00
Start Date: 11/Jan/19 03:00
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on pull request #7356: [BEAM-6231] 
Make Dataflow runner harness work with FixedWindow
URL: https://github.com/apache/beam/pull/7356#discussion_r246991823
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
 ##
 @@ -191,6 +195,37 @@ public Node apply(MutableNetwork input) {
 Iterables.filter(input.nodes(), InstructionOutputNode.class)) {
   InstructionOutput instructionOutput = node.getInstructionOutput();
 
+  // If this is the input PCollection or the output PCollection for an 
ExecutableStage, it's
+  // necessary to check whether the window coder is not a GlobalWindow 
coder.
+  if (isExecutableStageInputPCollection(input, node)
+  || isExecutableStageOutputPCollection(input, node)) {
+Coder javaCoder =
+
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(instructionOutput.getCodec()));
+// For now, Dataflow runner harness only deal with FixedWindow.
+if (FullWindowedValueCoder.class.isInstance(javaCoder)) {
+  FullWindowedValueCoder windowedValueCoder = 
(FullWindowedValueCoder) javaCoder;
+  Coder windowCoder = windowedValueCoder.getWindowCoder();
+  if (IntervalWindowCoder.class.isInstance(windowCoder)) {
+fakeWindowingStrategyId = "generatedFixedWindowingStrategy" + 
idGenerator.getId();
+try {
+  // Since the coder is the only needed from a WindowingStrategy, 
the size field of one
 
 Review comment:
   If we don't want to lie on the internal PCollection, then it's not possible 
to build a totally fake WindowingStrategy. For internal PCollections inside one 
ExecutanleStage, it needs to initialize WindowFn from WindowingStrategy proto.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184080)
Time Spent: 4h 20m  (was: 4h 10m)

> Triage test failures introduced by use_executable_stage_bundle_execution
> 
>
> Key: BEAM-6231
> URL: https://issues.apache.org/jira/browse/BEAM-6231
> Project: Beam
>  Issue Type: Test
>  Components: runner-dataflow
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6231) Triage test failures introduced by use_executable_stage_bundle_execution

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6231?focusedWorklogId=184079=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184079
 ]

ASF GitHub Bot logged work on BEAM-6231:


Author: ASF GitHub Bot
Created on: 11/Jan/19 02:58
Start Date: 11/Jan/19 02:58
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on issue #7356: [BEAM-6231] Make 
Dataflow runner harness work with FixedWindow
URL: https://github.com/apache/beam/pull/7356#issuecomment-453357404
 
 
   commit 
https://github.com/apache/beam/pull/7356/commits/4f3eaf51db8959e3563fafa786c4305352a08896
 addressed the comments above:
   
   - The function will register GlobalWindowingStrategy and 
FixedWindowingStrategy into componentBuilder before creating all PCollections.
   - Instead of only caring about input and output PCollections, it will put 
the right kind of WindowingStrategy based on window coder to all PCollection. 
This logic only applies to java coder, for python, it still uses GlobalWindow 
as default.
   
   Test log: https://scans.gradle.com/s/kameqe3xqgg4u/tests
   
   @robertwb Could you please review this latest changes?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184079)
Time Spent: 4h 10m  (was: 4h)

> Triage test failures introduced by use_executable_stage_bundle_execution
> 
>
> Key: BEAM-6231
> URL: https://issues.apache.org/jira/browse/BEAM-6231
> Project: Beam
>  Issue Type: Test
>  Components: runner-dataflow
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6138) Add User Metric Support to Java SDK

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6138?focusedWorklogId=184067=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184067
 ]

ASF GitHub Bot logged work on BEAM-6138:


Author: ASF GitHub Bot
Created on: 11/Jan/19 01:37
Start Date: 11/Jan/19 01:37
Worklog Time Spent: 10m 
  Work Description: ajamato commented on issue #7482: [BEAM-6138] Refactor 
the start and finish function registration so that PTransform IDs can be 
properly injected onto user counters.
URL: https://github.com/apache/beam/pull/7482#issuecomment-453336281
 
 
   @Ardagan @robertwb 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184067)
Time Spent: 5h 40m  (was: 5.5h)

> Add User Metric Support to Java SDK
> ---
>
> Key: BEAM-6138
> URL: https://issues.apache.org/jira/browse/BEAM-6138
> Project: Beam
>  Issue Type: New Feature
>  Components: java-fn-execution
>Reporter: Alex Amato
>Assignee: Alex Amato
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6138) Add User Metric Support to Java SDK

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6138?focusedWorklogId=184072=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184072
 ]

ASF GitHub Bot logged work on BEAM-6138:


Author: ASF GitHub Bot
Created on: 11/Jan/19 01:37
Start Date: 11/Jan/19 01:37
Worklog Time Spent: 10m 
  Work Description: ajamato commented on issue #7482: [BEAM-6138] Refactor 
the start and finish function registration so that PTransform IDs can be 
properly injected onto user counters.
URL: https://github.com/apache/beam/pull/7482#issuecomment-453336421
 
 
@robertwb let me know if you would like to review in addition, or there is 
someone else you would suggest here. Otherwise @Ardagan can review.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184072)
Time Spent: 5h 50m  (was: 5h 40m)

> Add User Metric Support to Java SDK
> ---
>
> Key: BEAM-6138
> URL: https://issues.apache.org/jira/browse/BEAM-6138
> Project: Beam
>  Issue Type: New Feature
>  Components: java-fn-execution
>Reporter: Alex Amato
>Assignee: Alex Amato
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6138) Add User Metric Support to Java SDK

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6138?focusedWorklogId=184066=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184066
 ]

ASF GitHub Bot logged work on BEAM-6138:


Author: ASF GitHub Bot
Created on: 11/Jan/19 01:36
Start Date: 11/Jan/19 01:36
Worklog Time Spent: 10m 
  Work Description: ajamato commented on pull request #7482: [BEAM-6138] 
Refactor the start and finish function registration so that PTransform IDs can 
be properly injected onto user counters.
URL: https://github.com/apache/beam/pull/7482
 
 
   [BEAM-6138] Refactor the start and finish function registration so that 
PTransform IDs can be properly injected onto user counters.
   
   **Please** add a meaningful description for your change here
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact 

[jira] [Commented] (BEAM-6138) Add User Metric Support to Java SDK

2019-01-10 Thread Alex Amato (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739923#comment-16739923
 ] 

Alex Amato commented on BEAM-6138:
--

Some initial PRs are checked in, but I made a mistake using the bundleId 
instead of the step ID on the counter field.

In order to properly inject the PTransformID, I will need to refactor the code 
a bit so that it knows what PTransformID it is invoking before it calls the 
start, process and finish functions. Working on a PR for that now.

Doing the basic refactor first

This will also come in handy later when we instrument the code to record 
execution times. Since we will have wrapping code around the UDFs. So we can 
record/set a state when we enter and exit process, finish and start, UDFs.

> Add User Metric Support to Java SDK
> ---
>
> Key: BEAM-6138
> URL: https://issues.apache.org/jira/browse/BEAM-6138
> Project: Beam
>  Issue Type: New Feature
>  Components: java-fn-execution
>Reporter: Alex Amato
>Assignee: Alex Amato
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6161) Add ElementCount MonitoringInfos for the Java SDK

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6161?focusedWorklogId=184013=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184013
 ]

ASF GitHub Bot logged work on BEAM-6161:


Author: ASF GitHub Bot
Created on: 10/Jan/19 23:53
Start Date: 10/Jan/19 23:53
Worklog Time Spent: 10m 
  Work Description: Ardagan commented on pull request #7272: [BEAM-6161] 
Introduce PCollectionConsumerRegistry and add ElementCoun…
URL: https://github.com/apache/beam/pull/7272#discussion_r246964094
 
 

 ##
 File path: sdks/go/pkg/beam/transforms/stats/stats.shims.go
 ##
 @@ -68,31 +68,31 @@ func init() {
runtime.RegisterType(reflect.TypeOf((*meanAccum)(nil)).Elem())
 
 Review comment:
   I believe this file should not be present in change.
   It seem to be formatting change only though.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184013)
Time Spent: 9h 10m  (was: 9h)

> Add ElementCount MonitoringInfos for the Java SDK
> -
>
> Key: BEAM-6161
> URL: https://issues.apache.org/jira/browse/BEAM-6161
> Project: Beam
>  Issue Type: New Feature
>  Components: java-fn-execution, sdk-java-harness
>Reporter: Alex Amato
>Assignee: Alex Amato
>Priority: Major
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-6352) Watch PTransform is broken

2019-01-10 Thread Kenneth Knowles (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739877#comment-16739877
 ] 

Kenneth Knowles commented on BEAM-6352:
---

I agree this is a blocker. Let me know if I can help.

> Watch PTransform is broken
> --
>
> Key: BEAM-6352
> URL: https://issues.apache.org/jira/browse/BEAM-6352
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.9.0
>Reporter: Gleb Kanterov
>Assignee: Boyuan Zhang
>Priority: Blocker
> Fix For: 2.10.0
>
>
> List of affected tests:
> org.apache.beam.sdk.transforms.WatchTest > 
> testSinglePollMultipleInputsWithSideInput FAILED
> org.apache.beam.sdk.transforms.WatchTest > testMultiplePollsWithKeyExtractor 
> FAILED
> org.apache.beam.sdk.transforms.WatchTest > testSinglePollMultipleInputs FAILED
> org.apache.beam.sdk.transforms.WatchTest > 
> testMultiplePollsWithTerminationDueToTerminationCondition FAILED
> org.apache.beam.sdk.transforms.WatchTest > testMultiplePollsWithManyResults 
> FAILED
> org.apache.beam.sdk.transforms.WatchTest > testSinglePollWithManyResults 
> FAILED
> org.apache.beam.sdk.transforms.WatchTest > 
> testMultiplePollsStopAfterTimeSinceNewOutput 
> org.apache.beam.sdk.transforms.WatchTest > 
> testMultiplePollsWithTerminationBecauseOutputIsFinal FAILED
> org.apache.beam.sdk.io.AvroIOTest$NeedsRunnerTests > 
> testContinuouslyWriteAndReadMultipleFilepatterns[0: true] FAILED
> org.apache.beam.sdk.io.AvroIOTest$NeedsRunnerTests > 
> testContinuouslyWriteAndReadMultipleFilepatterns[1: false] FAILED
> org.apache.beam.sdk.io.FileIOTest > testMatchWatchForNewFiles FAILED
> org.apache.beam.sdk.io.TextIOReadTest$BasicIOTest > testReadWatchForNewFiles 
> FAILED
> {code}
> java.lang.IllegalArgumentException: 
> org.apache.beam.sdk.transforms.Watch$WatchGrowthFn, @ProcessElement 
> process(ProcessContext, GrowthTracker): Has tracker type 
> Watch.GrowthTracker, but the DoFn's tracker 
> type must be of type RestrictionTracker.
> {code}
> Relevant pull requests:
> - https://github.com/apache/beam/pull/6467
> - https://github.com/apache/beam/pull/7374
> Now tests are marked with @Ignore referencing this JIRA issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5396) Flink portable runner savepoint / upgrade support

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5396?focusedWorklogId=184010=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184010
 ]

ASF GitHub Bot logged work on BEAM-5396:


Author: ASF GitHub Bot
Created on: 10/Jan/19 23:37
Start Date: 10/Jan/19 23:37
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #7362: [BEAM-5396] Assign 
portable operator uids
URL: https://github.com/apache/beam/pull/7362#issuecomment-453302620
 
 
   Run Python Flink ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184010)
Time Spent: 4h 20m  (was: 4h 10m)

> Flink portable runner savepoint / upgrade support
> -
>
> Key: BEAM-5396
> URL: https://issues.apache.org/jira/browse/BEAM-5396
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to support Flink savepoints for production 
> use. It should be possible to upgrade a stateful portable Beam pipeline that 
> runs on Flink, which involves taking a savepoint and then starting the new 
> version of the pipeline from that savepoint. The potential issues with 
> pipeline evolution and migration are similar to those when using the Flink 
> DataStream API (schema / name changes etc.).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-6401) NullPointerException and regression in BeamCalcRel

2019-01-10 Thread Andrew Pilloud (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739863#comment-16739863
 ] 

Andrew Pilloud commented on BEAM-6401:
--

Opened a calcite issue: CALCITE-2783

Because you have a workaround I'm going to stop working on this for the moment.

> NullPointerException and regression in BeamCalcRel
> --
>
> Key: BEAM-6401
> URL: https://issues.apache.org/jira/browse/BEAM-6401
> Project: Beam
>  Issue Type: Bug
>  Components: dsl-sql
>Reporter: Gleb Kanterov
>Assignee: Andrew Pilloud
>Priority: Major
>
> Test case to reproduce:
> {code}
> Schema schema =
> Schema.builder()
> .addNullableField("f0", Schema.FieldType.BOOLEAN)
> .build();
> String sql =
> "SELECT SUM(case when coalesce(f0, true) = true then 1 else 0 end) " +
> "FROM PCOLLECTION";
> List rows = ImmutableList.of(
> Row.withSchema(schema).addValue(false).build(),
> Row.withSchema(schema).addValue(true).build(),
> Row.withSchema(schema).addValue(null).build());
> PCollection input = pipeline.apply("rows", 
> Create.of(rows).withRowSchema(schema));
> 
> PAssert.that(input.apply(SqlTransform.query(sql))).satisfies(matchesScalar(2));
> pipeline.run();
> {code}
> Similar test case doesn't throw NPE, but fails:
> {code}
> Schema schema =
> Schema.builder()
> .addNullableField("f0", Schema.FieldType.BOOLEAN)
> .build();
> String sql =
> "SELECT SUM(case when coalesce(f0, false) = true then 1 else 0 end) " 
> +
> "FROM PCOLLECTION";
> List rows = ImmutableList.of(
> Row.withSchema(schema).addValue(false).build(),
> Row.withSchema(schema).addValue(true).build(),
> Row.withSchema(schema).addValue(null).build());
> PCollection input = pipeline.apply("rows", 
> Create.of(rows).withRowSchema(schema));
> 
> PAssert.that(input.apply(SqlTransform.query(sql))).satisfies(matchesScalar(1));
> pipeline.run();
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6382) SamzaRunner: add an option to read configs using a user-defined factory

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6382?focusedWorklogId=184009=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184009
 ]

ASF GitHub Bot logged work on BEAM-6382:


Author: ASF GitHub Bot
Created on: 10/Jan/19 23:30
Start Date: 10/Jan/19 23:30
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7464: 
[BEAM-6382] Cherry pick pr #7443 into 2.10.0 release branch
URL: https://github.com/apache/beam/pull/7464
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 184009)
Time Spent: 1.5h  (was: 1h 20m)

> SamzaRunner: add an option to read configs using a user-defined factory
> ---
>
> Key: BEAM-6382
> URL: https://issues.apache.org/jira/browse/BEAM-6382
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-samza
>Reporter: Xinyu Liu
>Assignee: Xinyu Liu
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We need an option to read configs from a factory which is useful in Yarn as 
> well as user-defined file format. By default this config factory is to read 
> property file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-6343) NeedsRunner tests not running in precommit

2019-01-10 Thread Gleb Kanterov (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gleb Kanterov closed BEAM-6343.
---

> NeedsRunner tests not running in precommit
> --
>
> Key: BEAM-6343
> URL: https://issues.apache.org/jira/browse/BEAM-6343
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Gleb Kanterov
>Priority: Critical
> Fix For: 2.10.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-6133) [SQL] Add support for TableMacro UDF

2019-01-10 Thread Gleb Kanterov (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gleb Kanterov closed BEAM-6133.
---

> [SQL] Add support for TableMacro UDF
> 
>
> Key: BEAM-6133
> URL: https://issues.apache.org/jira/browse/BEAM-6133
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Gleb Kanterov
>Assignee: Gleb Kanterov
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Now we support only ScalarFunction UDFs. In Calcite, there are other kinds of 
> UDFs. With TableMacro UDFs users can connect external data sources in a 
> similar way as in TableProvider, but without specifying a schema, or 
> enumerating a list of existing tables in advance. 
> An example use case is connecting external metadata service and querying 
> range of partitions.
> {code}
> SELECT COUNT(*) FROM table(my_udf('dataset', start = '2017-01-01', end = 
> '2018-01-01'))
> {code}
> Where the implementation of `my_udf` will connect to this service, get file 
> locations for a range of partitions, and translate to PTransform reading it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-5675) RowCoder#verifyDeterministic isn't consistent with Beam coders

2019-01-10 Thread Gleb Kanterov (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gleb Kanterov closed BEAM-5675.
---

> RowCoder#verifyDeterministic isn't consistent with Beam coders
> --
>
> Key: BEAM-5675
> URL: https://issues.apache.org/jira/browse/BEAM-5675
> Project: Beam
>  Issue Type: Bug
>  Components: dsl-sql
>Affects Versions: 2.7.0
>Reporter: Gleb Kanterov
>Assignee: Gleb Kanterov
>Priority: Major
> Fix For: 2.9.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> For instance, `DoubleCoder#verifyDeterministic` throws an exception, but 
> `RowCoder#verifyDeterministic` doesn't.
> Expected to throw NonDeterministicException
> {code:java}
> RowCoder.of(
> Schema
> .builder()
> .addField("foo", Schema.FieldType.DOUBLE)
> .build()
> ).verifyDeterministic();
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-6353) TFRecordIOTest is failing

2019-01-10 Thread Gleb Kanterov (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gleb Kanterov closed BEAM-6353.
---

> TFRecordIOTest is failing
> -
>
> Key: BEAM-6353
> URL: https://issues.apache.org/jira/browse/BEAM-6353
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Gleb Kanterov
>Assignee: Gleb Kanterov
>Priority: Major
> Fix For: 2.10.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-5866) RowCoder doesn't implement structuralValue

2019-01-10 Thread Gleb Kanterov (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gleb Kanterov resolved BEAM-5866.
-
   Resolution: Fixed
Fix Version/s: 2.10.0

> RowCoder doesn't implement structuralValue
> --
>
> Key: BEAM-5866
> URL: https://issues.apache.org/jira/browse/BEAM-5866
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Gleb Kanterov
>Assignee: Gleb Kanterov
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> These two properties fail for RowCoder with `BYTES` field, or `Map` 
> field. 
> {code}
>   public static  void testConsistentWithEquals(Coder coder, T example) {
> assumeTrue(coder.consistentWithEquals());
> byte[] bytes = encodeBytes(coder, example);
> // even if the coder is non-deterministic, if the encoded bytes match,
> // coder is consistent with equals, decoded values must be equal
> T out0 = decodeBytes(coder, bytes);
> T out1 = decodeBytes(coder, bytes);
> assertEquals("If the encoded bytes match, decoded values must be equal", 
> out0, out1);
> assertEquals(
> "If two values are equal, their hash codes must be equal",
> out0.hashCode(),
> out1.hashCode());
>   }
>   public static  void testStructuralValueConsistentWithEquals(Coder 
> coder, T example) {
> byte[] bytes = encodeBytes(coder, example);
> // even if coder is non-deterministic, if the encoded bytes match,
> // structural values must be equal
> Object out0 = coder.structuralValue(decodeBytes(coder, bytes));
> Object out1 = coder.structuralValue(decodeBytes(coder, bytes));
> assertEquals("If the encoded bytes match, structural values must be 
> equal", out0, out1);
> assertEquals(
> "If two values are equal, their hash codes must be equal",
> out0.hashCode(),
> out1.hashCode());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-6388) org.apache.beam.sdk.io.clickhouse.AtomicInsertTest.testAtomicInsert in Java PreCommit failed

2019-01-10 Thread Gleb Kanterov (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gleb Kanterov resolved BEAM-6388.
-
   Resolution: Fixed
Fix Version/s: Not applicable

> org.apache.beam.sdk.io.clickhouse.AtomicInsertTest.testAtomicInsert in Java 
> PreCommit failed
> 
>
> Key: BEAM-6388
> URL: https://issues.apache.org/jira/browse/BEAM-6388
> Project: Beam
>  Issue Type: Test
>  Components: test-failures
>Reporter: Boyuan Zhang
>Assignee: Gleb Kanterov
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> https://builds.apache.org/job/beam_PreCommit_Java_Cron/801/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183977=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183977
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:27
Start Date: 10/Jan/19 22:27
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246944156
 
 

 ##
 File path: sdks/python/setup.py
 ##
 @@ -114,7 +114,7 @@ def get_version():
 REQUIRED_PACKAGES = [
 'crcmod>=1.7,<2.0',
 'fastavro>=0.21.4,<0.22',
-'pyarrow>=0.11.1,<0.12.0',
+'pyarrow==0.11.1',
 
 Review comment:
   I don't think this PR should change this requirement.  Can you please 
clarify why this is needed?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183977)
Time Spent: 5h  (was: 4h 50m)

> Create a Cloud Bigtable Python connector
> 
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6280) Failure in PortableRunnerTest.test_error_traceback_includes_user_code

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6280?focusedWorklogId=183988=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183988
 ]

ASF GitHub Bot logged work on BEAM-6280:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:37
Start Date: 10/Jan/19 22:37
Worklog Time Spent: 10m 
  Work Description: rohdesamuel commented on pull request #7433: 
[BEAM-6280] Refactors Python portability tests to be multi-threaded aware
URL: https://github.com/apache/beam/pull/7433#discussion_r246946880
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/local_job_service.py
 ##
 @@ -102,42 +98,13 @@ def Cancel(self, request, context=None):
 
   def GetStateStream(self, request, context=None):
 job = self._jobs[request.job_id]
-state_queue = queue.Queue()
-job.add_state_change_callback(state_queue.put)
-try:
-  current_state = state_queue.get()
-except queue.Empty:
-  current_state = job.state
-yield beam_job_api_pb2.GetJobStateResponse(state=current_state)
-while current_state not in TERMINAL_STATES:
-  current_state = state_queue.get(block=True)
-  yield beam_job_api_pb2.GetJobStateResponse(state=current_state)
+for state in job.GetStateStream():
+  yield state
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183988)
Time Spent: 3h 10m  (was: 3h)

> Failure in PortableRunnerTest.test_error_traceback_includes_user_code
> -
>
> Key: BEAM-6280
> URL: https://issues.apache.org/jira/browse/BEAM-6280
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core, test-failures
>Reporter: Kenneth Knowles
>Assignee: Sam Rohde
>Priority: Critical
>  Labels: flaky-test
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/testReport/apache_beam.runners.portability.portable_runner_test/PortableRunnerTest/test_error_traceback_includes_user_code/]
> [https://scans.gradle.com/s/do3hjulee3gaa/console-log?task=:beam-sdks-python:testPython3]
> {code:java}
> 'second' not found in 'Traceback (most recent call last):\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py",
>  line 466, in test_error_traceback_includes_user_code\np | 
> beam.Create([0]) | beam.Map(first)  # pylint: 
> disable=expression-not-assigned\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/pipeline.py",
>  line 425, in __exit__\nself.run().wait_until_finish()\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/runners/portability/portable_runner.py",
>  line 314, in wait_until_finish\nself._job_id, self._state, 
> self._last_error_message()))\nRuntimeError: Pipeline 
> job-cdcefe6d-1caa-4487-9e63-e971f67ec68c failed in state FAILED: start 
>  coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>\n'{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6382) SamzaRunner: add an option to read configs using a user-defined factory

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6382?focusedWorklogId=183987=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183987
 ]

ASF GitHub Bot logged work on BEAM-6382:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:35
Start Date: 10/Jan/19 22:35
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on issue #7464: [BEAM-6382] 
Cherry pick pr #7443 into 2.10.0 release branch
URL: https://github.com/apache/beam/pull/7464#issuecomment-453281862
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183987)
Time Spent: 1h 20m  (was: 1h 10m)

> SamzaRunner: add an option to read configs using a user-defined factory
> ---
>
> Key: BEAM-6382
> URL: https://issues.apache.org/jira/browse/BEAM-6382
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-samza
>Reporter: Xinyu Liu
>Assignee: Xinyu Liu
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> We need an option to read configs from a factory which is useful in Yarn as 
> well as user-defined file format. By default this config factory is to read 
> property file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183986=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183986
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:34
Start Date: 10/Jan/19 22:34
Worklog Time Spent: 10m 
  Work Description: juan-rael commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246946061
 
 

 ##
 File path: sdks/python/setup.py
 ##
 @@ -114,7 +114,7 @@ def get_version():
 REQUIRED_PACKAGES = [
 'crcmod>=1.7,<2.0',
 'fastavro>=0.21.4,<0.22',
-'pyarrow>=0.11.1,<0.12.0',
+'pyarrow==0.11.1',
 
 Review comment:
   I get an conflict yesterday about the PR, when I check the test, there was 
an error in the 0.12.0 version of pyarrow...
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183986)
Time Spent: 6h 10m  (was: 6h)

> Create a Cloud Bigtable Python connector
> 
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183981=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183981
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:27
Start Date: 10/Jan/19 22:27
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246942663
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_test.py
 ##
 @@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unittest for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import datetime
+import logging
+import random
+import string
+import unittest
+import uuid
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigtable_io_write import BigtableWriteConfiguration
+from apache_beam.io.gcp.bigtable_io_write import WriteToBigtable
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# Protect against environments where bigtable library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud.bigtable import row, column_family, Client
+except ImportError:
+  Client = None
+
+
+class GenerateDirectRows(beam.DoFn):
+  """ Generates an iterator of DirectRow object to process on beam pipeline.
+
+  """
+
+  def process(self, row_values):
+""" Process beam pipeline using an element.
+
+:type row_value: dict
+:param row_value: dict: dict values with row_key and row_content having
+family, column_id and value of row.
+"""
+direct_row = row.DirectRow(row_key=row_values["row_key"])
+
+for row_value in row_values["row_content"]:
+  direct_row.set_cell(row_value["column_family_id"],
+  row_value["column_id"],
+  row_value["value"],
+  datetime.datetime.now())
+  yield direct_row
+
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOWriteIT(unittest.TestCase):
+  """ Bigtable Write Connector Test
+
+  """
+  DEFAULT_TABLE_PREFIX = "python-test"
+  instance_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  cluster_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  table_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  number = 500
+  LOCATION_ID = "us-east1-b"
+
+  def setUp(self):
+try:
+  from google.cloud.bigtable import enums
+  self.STORAGE_TYPE = enums.StorageType.HDD
+  self.INSTANCE_TYPE = enums.Instance.Type.DEVELOPMENT
+except ImportError:
+  self.STORAGE_TYPE = 2
+  self.INSTANCE_TYPE = 2
+
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.runner_name = type(self.test_pipeline.runner).__name__
+self.project = self.test_pipeline.get_option('project')
+self.client = Client(project=self.project, admin=True)
+self._create_instance_table()
+
+  def tearDown(self):
+if self.instance.exists():
+  self.instance.delete()
+
+  def test_bigtable_write_python(self):
+number = self.number
+config = BigtableWriteConfiguration(self.project, self.instance_id,
+self.table_id)
+pipeline_args = self.test_pipeline.options_list
+pipeline_options = PipelineOptions(pipeline_args)
+rows = self._generate_mutation_data(number)
+
+with beam.Pipeline(options=pipeline_options) as pipeline:
+  _ = (
+  pipeline
+  | 'Generate Row Values' >> beam.Create(rows)
+  | 'Generate Direct Rows' >> beam.ParDo(GenerateDirectRows())
+  | 'Write to BT' >> beam.ParDo(WriteToBigtable(config)))
+
+  result = pipeline.run()
+  result.wait_until_finish()
+
+  assert result.state == PipelineState.DONE
+  assert self._check_table(number)
+  if not hasattr(result, 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183978=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183978
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:27
Start Date: 10/Jan/19 22:27
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246943526
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_write.py
 ##
 @@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""BigTable connector
+
+This module implements writing to BigTable tables.
+The default mode is to set row data to write to BigTable tables.
+The syntax supported is described here:
+https://cloud.google.com/bigtable/docs/quickstart-cbt
+
+BigTable connector can be used as main outputs. A main output
+(common case) is expected to be massive and will be split into
+manageable chunks and processed in parallel. In the example below
+we created a list of rows then passed to the GeneratedDirectRows
+DoFn to set the Cells and then we call the WriteToBigtable to insert
+those generated rows in the table.
+
+  main_table = (p
+   | 'Generate Row Values' >> beam.Create(row_values)
+   | 'Generate Direct Rows' >> beam.ParDo(GenerateDirectRows())
+   | 'Write to BT' >> beam.ParDo(WriteToBigtable(beam_options)))
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.internal.gcp import auth
+from apache_beam.metrics import Metrics
+from apache_beam.transforms.display import DisplayDataItem
+
+try:
+  from google.cloud.bigtable import Client
+  from google.cloud.bigtable.batcher import MutationsBatcher
+except ImportError:
+  pass
+
+
+class WriteToBigtable(beam.DoFn):
+  """ Creates the connector can call and add_row to the batcher using each
+  row in beam pipe line
+
+  :type beam_options: class:`~bigtable_configuration.BigtableConfiguration`
+  :param beam_options: class `~bigtable_configuration.BigtableConfiguration`
+  """
+
+  def __init__(self, beam_options):
+super(WriteToBigtable, self).__init__(beam_options)
+self.project_id = beam_options.project_id
 
 Review comment:
   can we simply have a `self.beam_options` instead of duplicating all of the 
variables?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183978)
Time Spent: 5h 10m  (was: 5h)

> Create a Cloud Bigtable Python connector
> 
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183980=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183980
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:27
Start Date: 10/Jan/19 22:27
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246942425
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_test.py
 ##
 @@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unittest for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import datetime
+import logging
+import random
+import string
+import unittest
+import uuid
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigtable_io_write import BigtableWriteConfiguration
+from apache_beam.io.gcp.bigtable_io_write import WriteToBigtable
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# Protect against environments where bigtable library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud.bigtable import row, column_family, Client
+except ImportError:
+  Client = None
+
+
+class GenerateDirectRows(beam.DoFn):
+  """ Generates an iterator of DirectRow object to process on beam pipeline.
+
+  """
+
+  def process(self, row_values):
+""" Process beam pipeline using an element.
+
+:type row_value: dict
+:param row_value: dict: dict values with row_key and row_content having
+family, column_id and value of row.
+"""
+direct_row = row.DirectRow(row_key=row_values["row_key"])
+
+for row_value in row_values["row_content"]:
+  direct_row.set_cell(row_value["column_family_id"],
+  row_value["column_id"],
+  row_value["value"],
+  datetime.datetime.now())
+  yield direct_row
+
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOWriteIT(unittest.TestCase):
+  """ Bigtable Write Connector Test
+
+  """
+  DEFAULT_TABLE_PREFIX = "python-test"
+  instance_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  cluster_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  table_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  number = 500
+  LOCATION_ID = "us-east1-b"
+
+  def setUp(self):
+try:
+  from google.cloud.bigtable import enums
+  self.STORAGE_TYPE = enums.StorageType.HDD
+  self.INSTANCE_TYPE = enums.Instance.Type.DEVELOPMENT
+except ImportError:
+  self.STORAGE_TYPE = 2
+  self.INSTANCE_TYPE = 2
+
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.runner_name = type(self.test_pipeline.runner).__name__
+self.project = self.test_pipeline.get_option('project')
+self.client = Client(project=self.project, admin=True)
+self._create_instance_table()
+
+  def tearDown(self):
+if self.instance.exists():
+  self.instance.delete()
+
+  def test_bigtable_write_python(self):
+number = self.number
+config = BigtableWriteConfiguration(self.project, self.instance_id,
+self.table_id)
+pipeline_args = self.test_pipeline.options_list
+pipeline_options = PipelineOptions(pipeline_args)
+rows = self._generate_mutation_data(number)
+
+with beam.Pipeline(options=pipeline_options) as pipeline:
+  _ = (
+  pipeline
+  | 'Generate Row Values' >> beam.Create(rows)
+  | 'Generate Direct Rows' >> beam.ParDo(GenerateDirectRows())
+  | 'Write to BT' >> beam.ParDo(WriteToBigtable(config)))
+
+  result = pipeline.run()
+  result.wait_until_finish()
+
+  assert result.state == PipelineState.DONE
+  assert self._check_table(number)
+  if not hasattr(result, 

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183979=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183979
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:27
Start Date: 10/Jan/19 22:27
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246943243
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_write.py
 ##
 @@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""BigTable connector
+
+This module implements writing to BigTable tables.
+The default mode is to set row data to write to BigTable tables.
+The syntax supported is described here:
+https://cloud.google.com/bigtable/docs/quickstart-cbt
+
+BigTable connector can be used as main outputs. A main output
+(common case) is expected to be massive and will be split into
+manageable chunks and processed in parallel. In the example below
+we created a list of rows then passed to the GeneratedDirectRows
+DoFn to set the Cells and then we call the WriteToBigtable to insert
+those generated rows in the table.
+
+  main_table = (p
+   | 'Generate Row Values' >> beam.Create(row_values)
+   | 'Generate Direct Rows' >> beam.ParDo(GenerateDirectRows())
+   | 'Write to BT' >> beam.ParDo(WriteToBigtable(beam_options)))
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.internal.gcp import auth
+from apache_beam.metrics import Metrics
+from apache_beam.transforms.display import DisplayDataItem
+
+try:
+  from google.cloud.bigtable import Client
+  from google.cloud.bigtable.batcher import MutationsBatcher
+except ImportError:
+  pass
+
+
+class WriteToBigtable(beam.DoFn):
+  """ Creates the connector can call and add_row to the batcher using each
+  row in beam pipe line
+
+  :type beam_options: class:`~bigtable_configuration.BigtableConfiguration`
+  :param beam_options: class `~bigtable_configuration.BigtableConfiguration`
+  """
+
+  def __init__(self, beam_options):
+super(WriteToBigtable, self).__init__(beam_options)
+self.project_id = beam_options.project_id
+self.instance_id = beam_options.instance_id
+self.table_id = beam_options.table_id
+self.credentials = auth.get_service_credentials()
+
+self.client = None
+self.instance = None
+self.table = None
+self.batcher = None
+self.flush_count = None
 
 Review comment:
   please remove `self.flush_count` and `self.max_row_bytes`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183979)
Time Spent: 5h 20m  (was: 5h 10m)

> Create a Cloud Bigtable Python connector
> 
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183983=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183983
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:27
Start Date: 10/Jan/19 22:27
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246943656
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_write.py
 ##
 @@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""BigTable connector
+
+This module implements writing to BigTable tables.
+The default mode is to set row data to write to BigTable tables.
+The syntax supported is described here:
+https://cloud.google.com/bigtable/docs/quickstart-cbt
+
+BigTable connector can be used as main outputs. A main output
+(common case) is expected to be massive and will be split into
+manageable chunks and processed in parallel. In the example below
+we created a list of rows then passed to the GeneratedDirectRows
+DoFn to set the Cells and then we call the WriteToBigtable to insert
+those generated rows in the table.
+
+  main_table = (p
+   | 'Generate Row Values' >> beam.Create(row_values)
+   | 'Generate Direct Rows' >> beam.ParDo(GenerateDirectRows())
+   | 'Write to BT' >> beam.ParDo(WriteToBigtable(beam_options)))
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.internal.gcp import auth
+from apache_beam.metrics import Metrics
+from apache_beam.transforms.display import DisplayDataItem
+
+try:
+  from google.cloud.bigtable import Client
+  from google.cloud.bigtable.batcher import MutationsBatcher
+except ImportError:
+  pass
+
+
+class WriteToBigtable(beam.DoFn):
+  """ Creates the connector can call and add_row to the batcher using each
+  row in beam pipe line
+
+  :type beam_options: class:`~bigtable_configuration.BigtableConfiguration`
+  :param beam_options: class `~bigtable_configuration.BigtableConfiguration`
+  """
+
+  def __init__(self, beam_options):
+super(WriteToBigtable, self).__init__(beam_options)
+self.project_id = beam_options.project_id
+self.instance_id = beam_options.instance_id
+self.table_id = beam_options.table_id
+self.credentials = auth.get_service_credentials()
+
+self.client = None
+self.instance = None
+self.table = None
+self.batcher = None
+self.flush_count = None
+self.max_row_bytes = None
+self._app_profile_id = beam_options.app_profile_id
+self.written = Metrics.counter(self.__class__, 'Written Row')
+
+  def start_bundle(self):
+if self.client is None:
+  if self.credentials is None:
 
 Review comment:
   I don't think we need this if statement.  We should just need the 
`Client(project=self.project_id, admin=True)` variation.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183983)
Time Spent: 6h  (was: 5h 50m)

> Create a Cloud Bigtable Python connector
> 
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183982=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183982
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:27
Start Date: 10/Jan/19 22:27
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246943013
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_write.py
 ##
 @@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""BigTable connector
+
+This module implements writing to BigTable tables.
+The default mode is to set row data to write to BigTable tables.
+The syntax supported is described here:
+https://cloud.google.com/bigtable/docs/quickstart-cbt
+
+BigTable connector can be used as main outputs. A main output
+(common case) is expected to be massive and will be split into
+manageable chunks and processed in parallel. In the example below
+we created a list of rows then passed to the GeneratedDirectRows
+DoFn to set the Cells and then we call the WriteToBigtable to insert
+those generated rows in the table.
+
+  main_table = (p
+   | 'Generate Row Values' >> beam.Create(row_values)
+   | 'Generate Direct Rows' >> beam.ParDo(GenerateDirectRows())
+   | 'Write to BT' >> beam.ParDo(WriteToBigtable(beam_options)))
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.internal.gcp import auth
+from apache_beam.metrics import Metrics
+from apache_beam.transforms.display import DisplayDataItem
+
+try:
+  from google.cloud.bigtable import Client
+  from google.cloud.bigtable.batcher import MutationsBatcher
+except ImportError:
+  pass
+
+
+class WriteToBigtable(beam.DoFn):
+  """ Creates the connector can call and add_row to the batcher using each
+  row in beam pipe line
+
+  :type beam_options: class:`~bigtable_configuration.BigtableConfiguration`
+  :param beam_options: class `~bigtable_configuration.BigtableConfiguration`
+  """
+
+  def __init__(self, beam_options):
+super(WriteToBigtable, self).__init__(beam_options)
+self.project_id = beam_options.project_id
+self.instance_id = beam_options.instance_id
+self.table_id = beam_options.table_id
+self.credentials = auth.get_service_credentials()
 
 Review comment:
   I don't think we need this.  Should the Cloud Bigtable library already do 
this?  I think we can completely get rid of `self.credentials`
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183982)
Time Spent: 5h 50m  (was: 5h 40m)

> Create a Cloud Bigtable Python connector
> 
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6280) Failure in PortableRunnerTest.test_error_traceback_includes_user_code

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6280?focusedWorklogId=183970=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183970
 ]

ASF GitHub Bot logged work on BEAM-6280:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:13
Start Date: 10/Jan/19 22:13
Worklog Time Spent: 10m 
  Work Description: rohdesamuel commented on pull request #7433: 
[BEAM-6280] Refactors Python portability tests to be multi-threaded aware
URL: https://github.com/apache/beam/pull/7433#discussion_r246940490
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/local_job_service.py
 ##
 @@ -185,55 +152,89 @@ class BeamJob(threading.Thread):
 The current state of the pipeline is available as self.state.
 """
 
-  def __init__(self,
-   job_id,
-   pipeline_options,
-   pipeline_proto):
+  def __init__(self, pipeline_proto):
 super(BeamJob, self).__init__()
-self._job_id = job_id
-self._pipeline_options = pipeline_options
 self._pipeline_proto = pipeline_proto
 self._state = None
-self._state_change_callbacks = []
-self._last_log_message = None
-self._log_callbacks = [lambda msg: setattr(self, '_last_log_message', msg)]
+self._logs = []
+self._final_log_count = -1
+self._state_changes = []
 self.state = beam_job_api_pb2.JobState.STARTING
-self.daemon = True
-
-  def add_state_change_callback(self, f):
-self._state_change_callbacks.append(f)
-f(self.state)
-
-  def add_log_callback(self, f):
-self._log_callbacks.append(f)
 
   @property
   def state(self):
 return self._state
 
   @state.setter
   def state(self, new_state):
-for state_change_callback in self._state_change_callbacks:
-  state_change_callback(new_state)
+"""Sets the job state.
+
+This will inform GetStateStream and GetMessageStream of the new state.
+"""
+
+self._state_changes.append(new_state)
+self._logs.append(
+beam_job_api_pb2.JobMessagesResponse(
+state_response=beam_job_api_pb2.GetJobStateResponse(
+state=new_state)))
 self._state = new_state
 
 Review comment:
   I think this is fine
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183970)
Time Spent: 3h  (was: 2h 50m)

> Failure in PortableRunnerTest.test_error_traceback_includes_user_code
> -
>
> Key: BEAM-6280
> URL: https://issues.apache.org/jira/browse/BEAM-6280
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core, test-failures
>Reporter: Kenneth Knowles
>Assignee: Sam Rohde
>Priority: Critical
>  Labels: flaky-test
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/testReport/apache_beam.runners.portability.portable_runner_test/PortableRunnerTest/test_error_traceback_includes_user_code/]
> [https://scans.gradle.com/s/do3hjulee3gaa/console-log?task=:beam-sdks-python:testPython3]
> {code:java}
> 'second' not found in 'Traceback (most recent call last):\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py",
>  line 466, in test_error_traceback_includes_user_code\np | 
> beam.Create([0]) | beam.Map(first)  # pylint: 
> disable=expression-not-assigned\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/pipeline.py",
>  line 425, in __exit__\nself.run().wait_until_finish()\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/runners/portability/portable_runner.py",
>  line 314, in wait_until_finish\nself._job_id, self._state, 
> self._last_error_message()))\nRuntimeError: Pipeline 
> job-cdcefe6d-1caa-4487-9e63-e971f67ec68c failed in state FAILED: start 
>  coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>\n'{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183973
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:18
Start Date: 10/Jan/19 22:18
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246941825
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_test.py
 ##
 @@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unittest for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import datetime
+import logging
+import random
+import string
+import unittest
+import uuid
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigtable_io_write import BigtableWriteConfiguration
+from apache_beam.io.gcp.bigtable_io_write import WriteToBigtable
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# Protect against environments where bigtable library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud.bigtable import row, column_family, Client
+except ImportError:
+  Client = None
+
+
+class GenerateDirectRows(beam.DoFn):
+  """ Generates an iterator of DirectRow object to process on beam pipeline.
+
+  """
+
+  def process(self, row_values):
+""" Process beam pipeline using an element.
+
+:type row_value: dict
+:param row_value: dict: dict values with row_key and row_content having
+family, column_id and value of row.
+"""
+direct_row = row.DirectRow(row_key=row_values["row_key"])
+
+for row_value in row_values["row_content"]:
+  direct_row.set_cell(row_value["column_family_id"],
+  row_value["column_id"],
+  row_value["value"],
+  datetime.datetime.now())
+  yield direct_row
 
 Review comment:
   Please pass in just an index into process, and call 
`_generate_mutation_data`, which should return a full `DirectRow`.  I think 
that would be a bit more efficient.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183973)
Time Spent: 4h 50m  (was: 4h 40m)

> Create a Cloud Bigtable Python connector
> 
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5319) Finish Python 3 porting for runners module

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5319?focusedWorklogId=183972=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183972
 ]

ASF GitHub Bot logged work on BEAM-5319:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:17
Start Date: 10/Jan/19 22:17
Worklog Time Spent: 10m 
  Work Description: tvalentyn commented on pull request #7445: [BEAM-5319] 
Python 3 port runners module
URL: https://github.com/apache/beam/pull/7445#discussion_r246941557
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -215,9 +203,6 @@ def test_gbk_side_input(self):
   main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
   equal_to([(None, {'a': [1]})]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
 
 Review comment:
   Since we are targeting Python 3.5 right now, we don't have to add 3.6 fixes 
in this PR, but I would prefer to unskip such tests only for the 3.x versions, 
that the test is passing with.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183972)
Time Spent: 5h 40m  (was: 5.5h)

> Finish Python 3 porting for runners module
> --
>
> Key: BEAM-5319
> URL: https://issues.apache.org/jira/browse/BEAM-5319
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Robbe
>Assignee: Robbe
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6280) Failure in PortableRunnerTest.test_error_traceback_includes_user_code

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6280?focusedWorklogId=183966=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183966
 ]

ASF GitHub Bot logged work on BEAM-6280:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:12
Start Date: 10/Jan/19 22:12
Worklog Time Spent: 10m 
  Work Description: rohdesamuel commented on pull request #7433: 
[BEAM-6280] Refactors Python portability tests to be multi-threaded aware
URL: https://github.com/apache/beam/pull/7433#discussion_r246940113
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/local_job_service.py
 ##
 @@ -185,55 +152,89 @@ class BeamJob(threading.Thread):
 The current state of the pipeline is available as self.state.
 """
 
-  def __init__(self,
-   job_id,
-   pipeline_options,
-   pipeline_proto):
+  def __init__(self, pipeline_proto):
 super(BeamJob, self).__init__()
-self._job_id = job_id
-self._pipeline_options = pipeline_options
 self._pipeline_proto = pipeline_proto
 self._state = None
-self._state_change_callbacks = []
-self._last_log_message = None
-self._log_callbacks = [lambda msg: setattr(self, '_last_log_message', msg)]
+self._logs = []
+self._final_log_count = -1
+self._state_changes = []
 self.state = beam_job_api_pb2.JobState.STARTING
-self.daemon = True
-
-  def add_state_change_callback(self, f):
-self._state_change_callbacks.append(f)
-f(self.state)
-
-  def add_log_callback(self, f):
-self._log_callbacks.append(f)
 
   @property
   def state(self):
 return self._state
 
   @state.setter
   def state(self, new_state):
-for state_change_callback in self._state_change_callbacks:
-  state_change_callback(new_state)
+"""Sets the job state.
+
+This will inform GetStateStream and GetMessageStream of the new state.
+"""
+
+self._state_changes.append(new_state)
+self._logs.append(
+beam_job_api_pb2.JobMessagesResponse(
+state_response=beam_job_api_pb2.GetJobStateResponse(
+state=new_state)))
 self._state = new_state
 
+  def _cleanup(self):
+self._final_log_count = len(self._logs)
+
   def run(self):
-with JobLogHandler(self._log_callbacks):
+with JobLogHandler(self._logs):
   try:
 fn_api_runner.FnApiRunner().run_via_runner_api(self._pipeline_proto)
 logging.info('Successfully completed job.')
 self.state = beam_job_api_pb2.JobState.DONE
   except:  # pylint: disable=bare-except
 logging.exception('Error running pipeline.')
-traceback.print_exc()
+logging.exception(traceback.format_exc())
 self.state = beam_job_api_pb2.JobState.FAILED
 raise
+  finally:
+# In order for consumers to read all messages, this must be the final
+# instruction after a terminal state.
+self._cleanup()
 
   def cancel(self):
 if self.state not in TERMINAL_STATES:
   self.state = beam_job_api_pb2.JobState.CANCELLING
   # TODO(robertwb): Actually cancel...
   self.state = beam_job_api_pb2.JobState.CANCELLED
+  self._cleanup()
+
+  def GetStateStream(self):
+"""Returns all past and future states.
+
+This method guarentees that the consumer will see all job state 
transitions.
+"""
+state_index = 0
+
+# Pull all state changes until the job finishes.
+while self.state not in TERMINAL_STATES:
+  while state_index < len(self._state_changes):
 
 Review comment:
   Changed to use condition variables
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183966)
Time Spent: 2h 20m  (was: 2h 10m)

> Failure in PortableRunnerTest.test_error_traceback_includes_user_code
> -
>
> Key: BEAM-6280
> URL: https://issues.apache.org/jira/browse/BEAM-6280
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core, test-failures
>Reporter: Kenneth Knowles
>Assignee: Sam Rohde
>Priority: Critical
>  Labels: flaky-test
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/testReport/apache_beam.runners.portability.portable_runner_test/PortableRunnerTest/test_error_traceback_includes_user_code/]
> 

[jira] [Work logged] (BEAM-6280) Failure in PortableRunnerTest.test_error_traceback_includes_user_code

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6280?focusedWorklogId=183969=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183969
 ]

ASF GitHub Bot logged work on BEAM-6280:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:13
Start Date: 10/Jan/19 22:13
Worklog Time Spent: 10m 
  Work Description: rohdesamuel commented on pull request #7433: 
[BEAM-6280] Refactors Python portability tests to be multi-threaded aware
URL: https://github.com/apache/beam/pull/7433#discussion_r246940427
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/local_job_service.py
 ##
 @@ -185,55 +152,89 @@ class BeamJob(threading.Thread):
 The current state of the pipeline is available as self.state.
 """
 
-  def __init__(self,
-   job_id,
-   pipeline_options,
-   pipeline_proto):
+  def __init__(self, pipeline_proto):
 super(BeamJob, self).__init__()
-self._job_id = job_id
-self._pipeline_options = pipeline_options
 self._pipeline_proto = pipeline_proto
 self._state = None
-self._state_change_callbacks = []
-self._last_log_message = None
-self._log_callbacks = [lambda msg: setattr(self, '_last_log_message', msg)]
+self._logs = []
+self._final_log_count = -1
+self._state_changes = []
 self.state = beam_job_api_pb2.JobState.STARTING
-self.daemon = True
-
-  def add_state_change_callback(self, f):
-self._state_change_callbacks.append(f)
-f(self.state)
-
-  def add_log_callback(self, f):
-self._log_callbacks.append(f)
 
   @property
   def state(self):
 return self._state
 
   @state.setter
   def state(self, new_state):
-for state_change_callback in self._state_change_callbacks:
-  state_change_callback(new_state)
+"""Sets the job state.
+
+This will inform GetStateStream and GetMessageStream of the new state.
+"""
+
+self._state_changes.append(new_state)
+self._logs.append(
+beam_job_api_pb2.JobMessagesResponse(
+state_response=beam_job_api_pb2.GetJobStateResponse(
+state=new_state)))
 self._state = new_state
 
+  def _cleanup(self):
+self._final_log_count = len(self._logs)
+
   def run(self):
-with JobLogHandler(self._log_callbacks):
+with JobLogHandler(self._logs):
   try:
 fn_api_runner.FnApiRunner().run_via_runner_api(self._pipeline_proto)
 logging.info('Successfully completed job.')
 self.state = beam_job_api_pb2.JobState.DONE
   except:  # pylint: disable=bare-except
 logging.exception('Error running pipeline.')
-traceback.print_exc()
+logging.exception(traceback.format_exc())
 self.state = beam_job_api_pb2.JobState.FAILED
 raise
+  finally:
+# In order for consumers to read all messages, this must be the final
+# instruction after a terminal state.
+self._cleanup()
 
   def cancel(self):
 if self.state not in TERMINAL_STATES:
   self.state = beam_job_api_pb2.JobState.CANCELLING
   # TODO(robertwb): Actually cancel...
   self.state = beam_job_api_pb2.JobState.CANCELLED
+  self._cleanup()
+
+  def GetStateStream(self):
+"""Returns all past and future states.
+
+This method guarentees that the consumer will see all job state 
transitions.
+"""
+state_index = 0
+
+# Pull all state changes until the job finishes.
+while self.state not in TERMINAL_STATES:
 
 Review comment:
   Changed the state to work the same as the logs.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183969)
Time Spent: 2h 50m  (was: 2h 40m)

> Failure in PortableRunnerTest.test_error_traceback_includes_user_code
> -
>
> Key: BEAM-6280
> URL: https://issues.apache.org/jira/browse/BEAM-6280
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core, test-failures
>Reporter: Kenneth Knowles
>Assignee: Sam Rohde
>Priority: Critical
>  Labels: flaky-test
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/testReport/apache_beam.runners.portability.portable_runner_test/PortableRunnerTest/test_error_traceback_includes_user_code/]
> [https://scans.gradle.com/s/do3hjulee3gaa/console-log?task=:beam-sdks-python:testPython3]
> {code:java}
> 

[jira] [Work logged] (BEAM-6280) Failure in PortableRunnerTest.test_error_traceback_includes_user_code

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6280?focusedWorklogId=183968=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183968
 ]

ASF GitHub Bot logged work on BEAM-6280:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:12
Start Date: 10/Jan/19 22:12
Worklog Time Spent: 10m 
  Work Description: rohdesamuel commented on pull request #7433: 
[BEAM-6280] Refactors Python portability tests to be multi-threaded aware
URL: https://github.com/apache/beam/pull/7433#discussion_r246940214
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/local_job_service.py
 ##
 @@ -185,55 +152,89 @@ class BeamJob(threading.Thread):
 The current state of the pipeline is available as self.state.
 """
 
-  def __init__(self,
-   job_id,
-   pipeline_options,
-   pipeline_proto):
+  def __init__(self, pipeline_proto):
 super(BeamJob, self).__init__()
-self._job_id = job_id
-self._pipeline_options = pipeline_options
 self._pipeline_proto = pipeline_proto
 self._state = None
-self._state_change_callbacks = []
-self._last_log_message = None
-self._log_callbacks = [lambda msg: setattr(self, '_last_log_message', msg)]
+self._logs = []
+self._final_log_count = -1
+self._state_changes = []
 self.state = beam_job_api_pb2.JobState.STARTING
-self.daemon = True
-
-  def add_state_change_callback(self, f):
-self._state_change_callbacks.append(f)
-f(self.state)
-
-  def add_log_callback(self, f):
-self._log_callbacks.append(f)
 
   @property
   def state(self):
 return self._state
 
   @state.setter
   def state(self, new_state):
-for state_change_callback in self._state_change_callbacks:
-  state_change_callback(new_state)
+"""Sets the job state.
+
+This will inform GetStateStream and GetMessageStream of the new state.
+"""
+
+self._state_changes.append(new_state)
+self._logs.append(
+beam_job_api_pb2.JobMessagesResponse(
+state_response=beam_job_api_pb2.GetJobStateResponse(
+state=new_state)))
 self._state = new_state
 
+  def _cleanup(self):
+self._final_log_count = len(self._logs)
+
   def run(self):
-with JobLogHandler(self._log_callbacks):
+with JobLogHandler(self._logs):
   try:
 fn_api_runner.FnApiRunner().run_via_runner_api(self._pipeline_proto)
 logging.info('Successfully completed job.')
 self.state = beam_job_api_pb2.JobState.DONE
   except:  # pylint: disable=bare-except
 logging.exception('Error running pipeline.')
-traceback.print_exc()
+logging.exception(traceback.format_exc())
 self.state = beam_job_api_pb2.JobState.FAILED
 raise
+  finally:
+# In order for consumers to read all messages, this must be the final
+# instruction after a terminal state.
+self._cleanup()
 
   def cancel(self):
 if self.state not in TERMINAL_STATES:
   self.state = beam_job_api_pb2.JobState.CANCELLING
   # TODO(robertwb): Actually cancel...
   self.state = beam_job_api_pb2.JobState.CANCELLED
+  self._cleanup()
+
+  def GetStateStream(self):
+"""Returns all past and future states.
+
+This method guarentees that the consumer will see all job state 
transitions.
+"""
+state_index = 0
+
+# Pull all state changes until the job finishes.
+while self.state not in TERMINAL_STATES:
+  while state_index < len(self._state_changes):
+state = self._state_changes[state_index]
 
 Review comment:
   Done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183968)
Time Spent: 2h 40m  (was: 2.5h)

> Failure in PortableRunnerTest.test_error_traceback_includes_user_code
> -
>
> Key: BEAM-6280
> URL: https://issues.apache.org/jira/browse/BEAM-6280
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core, test-failures
>Reporter: Kenneth Knowles
>Assignee: Sam Rohde
>Priority: Critical
>  Labels: flaky-test
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/testReport/apache_beam.runners.portability.portable_runner_test/PortableRunnerTest/test_error_traceback_includes_user_code/]
> 

[jira] [Work logged] (BEAM-6280) Failure in PortableRunnerTest.test_error_traceback_includes_user_code

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6280?focusedWorklogId=183967=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183967
 ]

ASF GitHub Bot logged work on BEAM-6280:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:12
Start Date: 10/Jan/19 22:12
Worklog Time Spent: 10m 
  Work Description: rohdesamuel commented on pull request #7433: 
[BEAM-6280] Refactors Python portability tests to be multi-threaded aware
URL: https://github.com/apache/beam/pull/7433#discussion_r246940127
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/local_job_service.py
 ##
 @@ -185,55 +152,89 @@ class BeamJob(threading.Thread):
 The current state of the pipeline is available as self.state.
 """
 
-  def __init__(self,
-   job_id,
-   pipeline_options,
-   pipeline_proto):
+  def __init__(self, pipeline_proto):
 super(BeamJob, self).__init__()
-self._job_id = job_id
-self._pipeline_options = pipeline_options
 self._pipeline_proto = pipeline_proto
 self._state = None
-self._state_change_callbacks = []
-self._last_log_message = None
-self._log_callbacks = [lambda msg: setattr(self, '_last_log_message', msg)]
+self._logs = []
+self._final_log_count = -1
+self._state_changes = []
 self.state = beam_job_api_pb2.JobState.STARTING
-self.daemon = True
-
-  def add_state_change_callback(self, f):
-self._state_change_callbacks.append(f)
-f(self.state)
-
-  def add_log_callback(self, f):
-self._log_callbacks.append(f)
 
   @property
   def state(self):
 return self._state
 
   @state.setter
   def state(self, new_state):
-for state_change_callback in self._state_change_callbacks:
-  state_change_callback(new_state)
+"""Sets the job state.
+
+This will inform GetStateStream and GetMessageStream of the new state.
+"""
+
+self._state_changes.append(new_state)
+self._logs.append(
+beam_job_api_pb2.JobMessagesResponse(
+state_response=beam_job_api_pb2.GetJobStateResponse(
+state=new_state)))
 self._state = new_state
 
+  def _cleanup(self):
+self._final_log_count = len(self._logs)
+
   def run(self):
-with JobLogHandler(self._log_callbacks):
+with JobLogHandler(self._logs):
   try:
 fn_api_runner.FnApiRunner().run_via_runner_api(self._pipeline_proto)
 logging.info('Successfully completed job.')
 self.state = beam_job_api_pb2.JobState.DONE
   except:  # pylint: disable=bare-except
 logging.exception('Error running pipeline.')
-traceback.print_exc()
+logging.exception(traceback.format_exc())
 self.state = beam_job_api_pb2.JobState.FAILED
 raise
+  finally:
+# In order for consumers to read all messages, this must be the final
+# instruction after a terminal state.
+self._cleanup()
 
   def cancel(self):
 if self.state not in TERMINAL_STATES:
   self.state = beam_job_api_pb2.JobState.CANCELLING
   # TODO(robertwb): Actually cancel...
   self.state = beam_job_api_pb2.JobState.CANCELLED
+  self._cleanup()
+
+  def GetStateStream(self):
+"""Returns all past and future states.
+
+This method guarentees that the consumer will see all job state 
transitions.
+"""
+state_index = 0
+
+# Pull all state changes until the job finishes.
+while self.state not in TERMINAL_STATES:
+  while state_index < len(self._state_changes):
+state = self._state_changes[state_index]
+yield beam_job_api_pb2.GetJobStateResponse(state=state)
+state_index += 1
+yield beam_job_api_pb2.GetJobStateResponse(state=self.state)
+
+  def GetMessageStream(self):
+"""Returns all past and future messages.
+
+This method guarentees that the consumer will see all messages the job
+generates until it terminates.
+"""
+log_index = 0
+
+# Subscribers start with the first message and incrementally yield
+# subsequent logs. This process repeats until the job terminates and we 
know
+# the final amount of logs generated.
+while log_index != self._final_log_count:
 
 Review comment:
   see above
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183967)
Time Spent: 2.5h  (was: 2h 20m)

> Failure in PortableRunnerTest.test_error_traceback_includes_user_code
> -
>
> Key: BEAM-6280
> URL: 

[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=183964=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183964
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:07
Start Date: 10/Jan/19 22:07
Worklog Time Spent: 10m 
  Work Description: Ardagan commented on issue #7323: [BEAM-6181] 
Implemented msec counters support in FnApi world.
URL: https://github.com/apache/beam/pull/7323#issuecomment-453273147
 
 
   run java precommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183964)
Time Spent: 12h 20m  (was: 12h 10m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183963=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183963
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 22:06
Start Date: 10/Jan/19 22:06
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246938240
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_test.py
 ##
 @@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unittest for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import datetime
+import logging
+import random
+import string
+import unittest
+import uuid
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigtable_io_write import BigtableWriteConfiguration
+from apache_beam.io.gcp.bigtable_io_write import WriteToBigtable
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# Protect against environments where bigtable library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud.bigtable import row, column_family, Client
+except ImportError:
+  Client = None
+
+
+class GenerateDirectRows(beam.DoFn):
+  """ Generates an iterator of DirectRow object to process on beam pipeline.
+
+  """
+
+  def process(self, row_values):
+""" Process beam pipeline using an element.
+
+:type row_value: dict
+:param row_value: dict: dict values with row_key and row_content having
+family, column_id and value of row.
+"""
+direct_row = row.DirectRow(row_key=row_values["row_key"])
+
+for row_value in row_values["row_content"]:
+  direct_row.set_cell(row_value["column_family_id"],
+  row_value["column_id"],
+  row_value["value"],
+  datetime.datetime.now())
+  yield direct_row
 
 Review comment:
   The yield should only happen once.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183963)
Time Spent: 4h 40m  (was: 4.5h)

> Create a Cloud Bigtable Python connector
> 
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-6406) Add SonarQube analysis to jenkins jobs.

2019-01-10 Thread Mikhail Gryzykhin (JIRA)
Mikhail Gryzykhin created BEAM-6406:
---

 Summary: Add SonarQube analysis to jenkins jobs.
 Key: BEAM-6406
 URL: https://issues.apache.org/jira/browse/BEAM-6406
 Project: Beam
  Issue Type: New Feature
  Components: build-system
Reporter: Mikhail Gryzykhin
Assignee: Mikhail Gryzykhin


We are looking for tools to dashboard static analysis reports and set up 
quality gates for our code.

One of the tools that can handle this and is provided by Apache is 
[SonarQube|https://cwiki.apache.org/confluence/display/INFRA/SonarQube+Analysis].

We want to add SonarQube analysis tools to our builds to check on feasibility 
on using it further.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5396) Flink portable runner savepoint / upgrade support

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5396?focusedWorklogId=183958=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183958
 ]

ASF GitHub Bot logged work on BEAM-5396:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:47
Start Date: 10/Jan/19 21:47
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #7362: [BEAM-5396] Assign 
portable operator uids
URL: https://github.com/apache/beam/pull/7362#issuecomment-453266406
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183958)
Time Spent: 4h 10m  (was: 4h)

> Flink portable runner savepoint / upgrade support
> -
>
> Key: BEAM-5396
> URL: https://issues.apache.org/jira/browse/BEAM-5396
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to support Flink savepoints for production 
> use. It should be possible to upgrade a stateful portable Beam pipeline that 
> runs on Flink, which involves taking a savepoint and then starting the new 
> version of the pipeline from that savepoint. The potential issues with 
> pipeline evolution and migration are similar to those when using the Flink 
> DataStream API (schema / name changes etc.).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3612) Make it easy to generate type-specialized Go SDK reflectx.Funcs

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3612?focusedWorklogId=183956=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183956
 ]

ASF GitHub Bot logged work on BEAM-3612:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:44
Start Date: 10/Jan/19 21:44
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #7361: [BEAM-3612] 
Generate type assertion shims for beam.
URL: https://github.com/apache/beam/pull/7361
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183956)
Time Spent: 9h 10m  (was: 9h)

> Make it easy to generate type-specialized Go SDK reflectx.Funcs
> ---
>
> Key: BEAM-3612
> URL: https://issues.apache.org/jira/browse/BEAM-3612
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Robert Burke
>Priority: Major
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3612) Make it easy to generate type-specialized Go SDK reflectx.Funcs

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3612?focusedWorklogId=183953=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183953
 ]

ASF GitHub Bot logged work on BEAM-3612:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:41
Start Date: 10/Jan/19 21:41
Worklog Time Spent: 10m 
  Work Description: lostluck commented on issue #7361: [BEAM-3612] Generate 
type assertion shims for beam.
URL: https://github.com/apache/beam/pull/7361#issuecomment-453264533
 
 
   @aaltay  Please merge! Thanks! :D 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183953)
Time Spent: 9h  (was: 8h 50m)

> Make it easy to generate type-specialized Go SDK reflectx.Funcs
> ---
>
> Key: BEAM-3612
> URL: https://issues.apache.org/jira/browse/BEAM-3612
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Robert Burke
>Priority: Major
>  Time Spent: 9h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5396) Flink portable runner savepoint / upgrade support

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5396?focusedWorklogId=183948=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183948
 ]

ASF GitHub Bot logged work on BEAM-5396:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:27
Start Date: 10/Jan/19 21:27
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #7362: [BEAM-5396] Assign 
portable operator uids
URL: https://github.com/apache/beam/pull/7362#issuecomment-453260382
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183948)
Time Spent: 4h  (was: 3h 50m)

> Flink portable runner savepoint / upgrade support
> -
>
> Key: BEAM-5396
> URL: https://issues.apache.org/jira/browse/BEAM-5396
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to support Flink savepoints for production 
> use. It should be possible to upgrade a stateful portable Beam pipeline that 
> runs on Flink, which involves taking a savepoint and then starting the new 
> version of the pipeline from that savepoint. The potential issues with 
> pipeline evolution and migration are similar to those when using the Flink 
> DataStream API (schema / name changes etc.).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6405) Improve PortableValidatesRunner test reliability on Jenkins

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6405?focusedWorklogId=183936=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183936
 ]

ASF GitHub Bot logged work on BEAM-6405:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:18
Start Date: 10/Jan/19 21:18
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #7461: [BEAM-6405] Let 
PortableValidatesRunner tests run in EMBEDDED environment
URL: https://github.com/apache/beam/pull/7461#issuecomment-453257301
 
 
   Only 2 tests fail :)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183936)
Time Spent: 2.5h  (was: 2h 20m)

> Improve PortableValidatesRunner test reliability on Jenkins
> ---
>
> Key: BEAM-6405
> URL: https://issues.apache.org/jira/browse/BEAM-6405
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> The PVR tests seem to be passing fine and then failing consecutively for no 
> reason: https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/ 
> It looks like the outrageous parallelism, i.e. number of available cores, is 
> responsible for the flakiness if there is additional load on the build 
> slaves. We should lower the parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5396) Flink portable runner savepoint / upgrade support

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5396?focusedWorklogId=183942=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183942
 ]

ASF GitHub Bot logged work on BEAM-5396:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:25
Start Date: 10/Jan/19 21:25
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #7362: [BEAM-5396] Assign 
portable operator uids
URL: https://github.com/apache/beam/pull/7362#issuecomment-453259679
 
 
   Tested with the original pipeline and the restore error is gone. Thanks!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183942)
Time Spent: 3h 50m  (was: 3h 40m)

> Flink portable runner savepoint / upgrade support
> -
>
> Key: BEAM-5396
> URL: https://issues.apache.org/jira/browse/BEAM-5396
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to support Flink savepoints for production 
> use. It should be possible to upgrade a stateful portable Beam pipeline that 
> runs on Flink, which involves taking a savepoint and then starting the new 
> version of the pipeline from that savepoint. The potential issues with 
> pipeline evolution and migration are similar to those when using the Flink 
> DataStream API (schema / name changes etc.).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6405) Improve PortableValidatesRunner test reliability on Jenkins

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6405?focusedWorklogId=183943=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183943
 ]

ASF GitHub Bot logged work on BEAM-6405:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:25
Start Date: 10/Jan/19 21:25
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7461: [BEAM-6405] Let 
PortableValidatesRunner tests run in EMBEDDED environment
URL: https://github.com/apache/beam/pull/7461#issuecomment-453259702
 
 
   Test time went down notably, streaming and batch execution is failing for 
these tests:
   
   ```
   
org.apache.beam.sdk.transforms.ParDoTest$BasicTests.testPipelineOptionsParameter
   
org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testPipelineOptionsParameterOnTimer
   ```
   
   Need to check why they fail in EMBEDDED mode.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183943)
Time Spent: 2h 40m  (was: 2.5h)

> Improve PortableValidatesRunner test reliability on Jenkins
> ---
>
> Key: BEAM-6405
> URL: https://issues.apache.org/jira/browse/BEAM-6405
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The PVR tests seem to be passing fine and then failing consecutively for no 
> reason: https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/ 
> It looks like the outrageous parallelism, i.e. number of available cores, is 
> responsible for the flakiness if there is additional load on the build 
> slaves. We should lower the parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-6326) Fix test failures in streaming mode of PortableValidatesRunner

2019-01-10 Thread Maximilian Michels (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed BEAM-6326.

   Resolution: Fixed
Fix Version/s: 2.11.0
   2.10.0

> Fix test failures in streaming mode of PortableValidatesRunner
> --
>
> Key: BEAM-6326
> URL: https://issues.apache.org/jira/browse/BEAM-6326
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.10.0, 2.11.0
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> As of BEAM-6009, the tests are run separately for batch and streaming. This 
> has revealed issues with a couple of tests which need to be addressed.
> The Gradle task is: {{validatesPortableRunnerStreaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5396) Flink portable runner savepoint / upgrade support

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5396?focusedWorklogId=183939=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183939
 ]

ASF GitHub Bot logged work on BEAM-5396:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:20
Start Date: 10/Jan/19 21:20
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #7362: [BEAM-5396] Assign 
portable operator uids
URL: https://github.com/apache/beam/pull/7362#issuecomment-453258000
 
 
   Run Python Flink ValidatesRunner
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183939)
Time Spent: 3h 40m  (was: 3.5h)

> Flink portable runner savepoint / upgrade support
> -
>
> Key: BEAM-5396
> URL: https://issues.apache.org/jira/browse/BEAM-5396
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to support Flink savepoints for production 
> use. It should be possible to upgrade a stateful portable Beam pipeline that 
> runs on Flink, which involves taking a savepoint and then starting the new 
> version of the pipeline from that savepoint. The potential issues with 
> pipeline evolution and migration are similar to those when using the Flink 
> DataStream API (schema / name changes etc.).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6248) Add Flink 1.7.x build target to Flink Runner

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6248?focusedWorklogId=183940=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183940
 ]

ASF GitHub Bot logged work on BEAM-6248:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:20
Start Date: 10/Jan/19 21:20
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7300: [BEAM-6248] Add Flink 
v1.7 build target to Flink Runner 
URL: https://github.com/apache/beam/pull/7300#issuecomment-453258075
 
 
   Backported to `release-2.10.0`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183940)
Time Spent: 5h  (was: 4h 50m)

> Add Flink 1.7.x build target to Flink Runner
> 
>
> Key: BEAM-6248
> URL: https://issues.apache.org/jira/browse/BEAM-6248
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.10.0, 2.11.0
>
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> With BEAM-5419 we can add a Flink 1.7.x build target.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (BEAM-6248) Add Flink 1.7.x build target to Flink Runner

2019-01-10 Thread Maximilian Michels (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed BEAM-6248.

   Resolution: Fixed
Fix Version/s: 2.11.0

> Add Flink 1.7.x build target to Flink Runner
> 
>
> Key: BEAM-6248
> URL: https://issues.apache.org/jira/browse/BEAM-6248
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.10.0, 2.11.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> With BEAM-5419 we can add a Flink 1.7.x build target.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5396) Flink portable runner savepoint / upgrade support

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5396?focusedWorklogId=183934=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183934
 ]

ASF GitHub Bot logged work on BEAM-5396:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:09
Start Date: 10/Jan/19 21:09
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7362: [BEAM-5396] Assign 
portable operator uids
URL: https://github.com/apache/beam/pull/7362#issuecomment-453254528
 
 
   @tweise Rebased to latest master.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183934)
Time Spent: 3.5h  (was: 3h 20m)

> Flink portable runner savepoint / upgrade support
> -
>
> Key: BEAM-5396
> URL: https://issues.apache.org/jira/browse/BEAM-5396
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Thomas Weise
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner needs to support Flink savepoints for production 
> use. It should be possible to upgrade a stateful portable Beam pipeline that 
> runs on Flink, which involves taking a savepoint and then starting the new 
> version of the pipeline from that savepoint. The potential issues with 
> pipeline evolution and migration are similar to those when using the Flink 
> DataStream API (schema / name changes etc.).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6248) Add Flink 1.7.x build target to Flink Runner

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6248?focusedWorklogId=183930=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183930
 ]

ASF GitHub Bot logged work on BEAM-6248:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:02
Start Date: 10/Jan/19 21:02
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #7300: [BEAM-6248] Add Flink 
v1.7 build target to Flink Runner 
URL: https://github.com/apache/beam/pull/7300#issuecomment-453252493
 
 
   @mxm please backport and resolve JIRA: 
https://issues.apache.org/jira/browse/BEAM-6248
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183930)
Time Spent: 4h 50m  (was: 4h 40m)

> Add Flink 1.7.x build target to Flink Runner
> 
>
> Key: BEAM-6248
> URL: https://issues.apache.org/jira/browse/BEAM-6248
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> With BEAM-5419 we can add a Flink 1.7.x build target.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6405) Improve PortableValidatesRunner test reliability on Jenkins

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6405?focusedWorklogId=183931=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183931
 ]

ASF GitHub Bot logged work on BEAM-6405:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:04
Start Date: 10/Jan/19 21:04
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7461: [BEAM-6405] Let 
PortableValidatesRunner tests run in EMBEDDED environment
URL: https://github.com/apache/beam/pull/7461#issuecomment-453252958
 
 
   Just noticed we can further decrease the build time by removing the 
dependency of the task on `:beam-sdks-java-container:docker`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183931)
Time Spent: 2h 20m  (was: 2h 10m)

> Improve PortableValidatesRunner test reliability on Jenkins
> ---
>
> Key: BEAM-6405
> URL: https://issues.apache.org/jira/browse/BEAM-6405
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> The PVR tests seem to be passing fine and then failing consecutively for no 
> reason: https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/ 
> It looks like the outrageous parallelism, i.e. number of available cores, is 
> responsible for the flakiness if there is additional load on the build 
> slaves. We should lower the parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6382) SamzaRunner: add an option to read configs using a user-defined factory

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6382?focusedWorklogId=183929=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183929
 ]

ASF GitHub Bot logged work on BEAM-6382:


Author: ASF GitHub Bot
Created on: 10/Jan/19 21:01
Start Date: 10/Jan/19 21:01
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7464: 
[BEAM-6382] Cherry pick pr #7443 into 2.10.0 release branch
URL: https://github.com/apache/beam/pull/7464
 
 
   Cherry-pick [BEAM-6382] SamzaRunner: add an option to read configs using a 
user-defined factory into the next release branch.
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183929)
Time Spent: 1h 10m  (was: 

[jira] [Work logged] (BEAM-6405) Improve PortableValidatesRunner test reliability on Jenkins

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6405?focusedWorklogId=183921=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183921
 ]

ASF GitHub Bot logged work on BEAM-6405:


Author: ASF GitHub Bot
Created on: 10/Jan/19 20:43
Start Date: 10/Jan/19 20:43
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7461: [BEAM-6405] Let 
PortableValidatesRunner tests run in EMBEDDED environment
URL: https://github.com/apache/beam/pull/7461#issuecomment-453246488
 
 
   Run Java Flink PortableValidatesRunner Streaming
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183921)
Time Spent: 1h 40m  (was: 1.5h)

> Improve PortableValidatesRunner test reliability on Jenkins
> ---
>
> Key: BEAM-6405
> URL: https://issues.apache.org/jira/browse/BEAM-6405
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> The PVR tests seem to be passing fine and then failing consecutively for no 
> reason: https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/ 
> It looks like the outrageous parallelism, i.e. number of available cores, is 
> responsible for the flakiness if there is additional load on the build 
> slaves. We should lower the parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6405) Improve PortableValidatesRunner test reliability on Jenkins

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6405?focusedWorklogId=183924=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183924
 ]

ASF GitHub Bot logged work on BEAM-6405:


Author: ASF GitHub Bot
Created on: 10/Jan/19 20:44
Start Date: 10/Jan/19 20:44
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7461: [BEAM-6405] Let 
PortableValidatesRunner tests run in EMBEDDED environment
URL: https://github.com/apache/beam/pull/7461#issuecomment-453246576
 
 
   CC @tweise @angoenka 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183924)
Time Spent: 2h 10m  (was: 2h)

> Improve PortableValidatesRunner test reliability on Jenkins
> ---
>
> Key: BEAM-6405
> URL: https://issues.apache.org/jira/browse/BEAM-6405
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The PVR tests seem to be passing fine and then failing consecutively for no 
> reason: https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/ 
> It looks like the outrageous parallelism, i.e. number of available cores, is 
> responsible for the flakiness if there is additional load on the build 
> slaves. We should lower the parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6405) Improve PortableValidatesRunner test reliability on Jenkins

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6405?focusedWorklogId=183923=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183923
 ]

ASF GitHub Bot logged work on BEAM-6405:


Author: ASF GitHub Bot
Created on: 10/Jan/19 20:44
Start Date: 10/Jan/19 20:44
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7461: [BEAM-6405] Let 
PortableValidatesRunner tests run in EMBEDDED environment
URL: https://github.com/apache/beam/pull/7461#issuecomment-453246576
 
 
   CC @tweise 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183923)
Time Spent: 2h  (was: 1h 50m)

> Improve PortableValidatesRunner test reliability on Jenkins
> ---
>
> Key: BEAM-6405
> URL: https://issues.apache.org/jira/browse/BEAM-6405
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> The PVR tests seem to be passing fine and then failing consecutively for no 
> reason: https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/ 
> It looks like the outrageous parallelism, i.e. number of available cores, is 
> responsible for the flakiness if there is additional load on the build 
> slaves. We should lower the parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6405) Improve PortableValidatesRunner test reliability on Jenkins

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6405?focusedWorklogId=183922=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183922
 ]

ASF GitHub Bot logged work on BEAM-6405:


Author: ASF GitHub Bot
Created on: 10/Jan/19 20:44
Start Date: 10/Jan/19 20:44
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7461: [BEAM-6405] Let 
PortableValidatesRunner tests run in EMBEDDED environment
URL: https://github.com/apache/beam/pull/7461#issuecomment-453246514
 
 
   Run Java Flink PortableValidatesRunner Batch
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183922)
Time Spent: 1h 50m  (was: 1h 40m)

> Improve PortableValidatesRunner test reliability on Jenkins
> ---
>
> Key: BEAM-6405
> URL: https://issues.apache.org/jira/browse/BEAM-6405
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> The PVR tests seem to be passing fine and then failing consecutively for no 
> reason: https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/ 
> It looks like the outrageous parallelism, i.e. number of available cores, is 
> responsible for the flakiness if there is additional load on the build 
> slaves. We should lower the parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6405) Improve PortableValidatesRunner test reliability on Jenkins

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6405?focusedWorklogId=183920=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183920
 ]

ASF GitHub Bot logged work on BEAM-6405:


Author: ASF GitHub Bot
Created on: 10/Jan/19 20:43
Start Date: 10/Jan/19 20:43
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #7461: [BEAM-6405] Let 
PortableValidatesRunner tests run in EMBEDDED environment
URL: https://github.com/apache/beam/pull/7461
 
 
   We have seen repeated OOM errors on the Jenkins machines. We could 
potentially
   reduce the number of parallel tests further but the test runtime is already
   about an hour. Using the EMBEDDED environment should give us less memory
   problems and faster test execution.
   
   Note that EMBEDDED makes sense for these tests because they are purely
   Java-based.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183920)
Time Spent: 1.5h  (was: 1h 20m)

> Improve PortableValidatesRunner test reliability on Jenkins
> ---
>
> Key: BEAM-6405
> URL: https://issues.apache.org/jira/browse/BEAM-6405
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>

[jira] [Work logged] (BEAM-6248) Add Flink 1.7.x build target to Flink Runner

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6248?focusedWorklogId=183919=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183919
 ]

ASF GitHub Bot logged work on BEAM-6248:


Author: ASF GitHub Bot
Created on: 10/Jan/19 20:41
Start Date: 10/Jan/19 20:41
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #7300: [BEAM-6248] Add 
Flink v1.7 build target to Flink Runner 
URL: https://github.com/apache/beam/pull/7300
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183919)
Time Spent: 4h 40m  (was: 4.5h)

> Add Flink 1.7.x build target to Flink Runner
> 
>
> Key: BEAM-6248
> URL: https://issues.apache.org/jira/browse/BEAM-6248
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> With BEAM-5419 we can add a Flink 1.7.x build target.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6248) Add Flink 1.7.x build target to Flink Runner

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6248?focusedWorklogId=183909=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183909
 ]

ASF GitHub Bot logged work on BEAM-6248:


Author: ASF GitHub Bot
Created on: 10/Jan/19 20:11
Start Date: 10/Jan/19 20:11
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #7300: [BEAM-6248] Add Flink 
v1.7 build target to Flink Runner 
URL: https://github.com/apache/beam/pull/7300#issuecomment-453236625
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183909)
Time Spent: 4.5h  (was: 4h 20m)

> Add Flink 1.7.x build target to Flink Runner
> 
>
> Key: BEAM-6248
> URL: https://issues.apache.org/jira/browse/BEAM-6248
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> With BEAM-5419 we can add a Flink 1.7.x build target.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6294) Use Flink's redistribute for reshuffle.

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6294?focusedWorklogId=183897=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183897
 ]

ASF GitHub Bot logged work on BEAM-6294:


Author: ASF GitHub Bot
Created on: 10/Jan/19 19:51
Start Date: 10/Jan/19 19:51
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #7457: [BEAM-6294] 
Ensure input and output coders are equal for reshuffle transforms.
URL: https://github.com/apache/beam/pull/7457
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183897)
Time Spent: 1h 50m  (was: 1h 40m)

> Use Flink's redistribute for reshuffle.
> ---
>
> Key: BEAM-6294
> URL: https://issues.apache.org/jira/browse/BEAM-6294
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink, sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Python needs to publish the URN over the FnAPI which is pretty easy, but 
> Flink also needs to ensure that the composite structure does not get fused. 
> Unlike with GBK, we can't assume all runners implement this as a primitive. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6280) Failure in PortableRunnerTest.test_error_traceback_includes_user_code

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6280?focusedWorklogId=183883=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183883
 ]

ASF GitHub Bot logged work on BEAM-6280:


Author: ASF GitHub Bot
Created on: 10/Jan/19 19:35
Start Date: 10/Jan/19 19:35
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #7433: [BEAM-6280] 
Refactors Python portability tests to be multi-threaded aware
URL: https://github.com/apache/beam/pull/7433#discussion_r246892478
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/local_job_service.py
 ##
 @@ -102,42 +98,13 @@ def Cancel(self, request, context=None):
 
   def GetStateStream(self, request, context=None):
 job = self._jobs[request.job_id]
-state_queue = queue.Queue()
-job.add_state_change_callback(state_queue.put)
-try:
-  current_state = state_queue.get()
-except queue.Empty:
-  current_state = job.state
-yield beam_job_api_pb2.GetJobStateResponse(state=current_state)
-while current_state not in TERMINAL_STATES:
-  current_state = state_queue.get(block=True)
-  yield beam_job_api_pb2.GetJobStateResponse(state=current_state)
+for state in job.GetStateStream():
+  yield state
 
 Review comment:
   I think the logic of turning a state into a GetJobStateResponse logically 
belongs here, not in BeamJob. Similarly for messages. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183883)
Time Spent: 2h 10m  (was: 2h)

> Failure in PortableRunnerTest.test_error_traceback_includes_user_code
> -
>
> Key: BEAM-6280
> URL: https://issues.apache.org/jira/browse/BEAM-6280
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core, test-failures
>Reporter: Kenneth Knowles
>Assignee: Sam Rohde
>Priority: Critical
>  Labels: flaky-test
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/testReport/apache_beam.runners.portability.portable_runner_test/PortableRunnerTest/test_error_traceback_includes_user_code/]
> [https://scans.gradle.com/s/do3hjulee3gaa/console-log?task=:beam-sdks-python:testPython3]
> {code:java}
> 'second' not found in 'Traceback (most recent call last):\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py",
>  line 466, in test_error_traceback_includes_user_code\np | 
> beam.Create([0]) | beam.Map(first)  # pylint: 
> disable=expression-not-assigned\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/pipeline.py",
>  line 425, in __exit__\nself.run().wait_until_finish()\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/runners/portability/portable_runner.py",
>  line 314, in wait_until_finish\nself._job_id, self._state, 
> self._last_error_message()))\nRuntimeError: Pipeline 
> job-cdcefe6d-1caa-4487-9e63-e971f67ec68c failed in state FAILED: start 
>  coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>\n'{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=183884=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183884
 ]

ASF GitHub Bot logged work on BEAM-6392:


Author: ASF GitHub Bot
Created on: 10/Jan/19 19:36
Start Date: 10/Jan/19 19:36
Worklog Time Spent: 10m 
  Work Description: kanterov commented on issue #7441: [BEAM-6392] Add 
support for the BigQuery read API to BigQueryIO.
URL: https://github.com/apache/beam/pull/7441#issuecomment-453176331
 
 
   @kmjung this is awesome, I've been waiting for this feature. Is there any 
documentation for BigQuery API for direct reads? Couldn't find any.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183884)
Time Spent: 50m  (was: 40m)

> Add support for new BigQuery streaming read API to BigQueryIO
> -
>
> Key: BEAM-6392
> URL: https://issues.apache.org/jira/browse/BEAM-6392
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Kenneth Jung
>Assignee: Kenneth Jung
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> BigQuery has developed a new streaming egress API which will soon reach 
> public availability. Add support for the new API in BigQueryIO.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6280) Failure in PortableRunnerTest.test_error_traceback_includes_user_code

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6280?focusedWorklogId=183882=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183882
 ]

ASF GitHub Bot logged work on BEAM-6280:


Author: ASF GitHub Bot
Created on: 10/Jan/19 19:35
Start Date: 10/Jan/19 19:35
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #7433: [BEAM-6280] 
Refactors Python portability tests to be multi-threaded aware
URL: https://github.com/apache/beam/pull/7433#discussion_r246893622
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/local_job_service.py
 ##
 @@ -185,55 +152,89 @@ class BeamJob(threading.Thread):
 The current state of the pipeline is available as self.state.
 """
 
-  def __init__(self,
-   job_id,
-   pipeline_options,
-   pipeline_proto):
+  def __init__(self, pipeline_proto):
 super(BeamJob, self).__init__()
-self._job_id = job_id
-self._pipeline_options = pipeline_options
 self._pipeline_proto = pipeline_proto
 self._state = None
-self._state_change_callbacks = []
-self._last_log_message = None
-self._log_callbacks = [lambda msg: setattr(self, '_last_log_message', msg)]
+self._logs = []
 
 Review comment:
   I still don't think that we should be storing all logs. What if we allowed 
connecting *before* the job is started to ensure all logs were seen. Otherwise, 
one gets the logs from the point at which one connected (which better matches 
the intention). 
   
   Similarly for states, as long as one gets a response for the current state, 
there's no need to replay all past states. In this case we would go back to the 
callback logic, but maybe remove the last_log_message which is the fishy one, 
and fix the runner to start listening to logs before it starts the job (which 
we'll want to do for other jobs servers as well). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183882)
Time Spent: 2h  (was: 1h 50m)

> Failure in PortableRunnerTest.test_error_traceback_includes_user_code
> -
>
> Key: BEAM-6280
> URL: https://issues.apache.org/jira/browse/BEAM-6280
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core, test-failures
>Reporter: Kenneth Knowles
>Assignee: Sam Rohde
>Priority: Critical
>  Labels: flaky-test
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/testReport/apache_beam.runners.portability.portable_runner_test/PortableRunnerTest/test_error_traceback_includes_user_code/]
> [https://scans.gradle.com/s/do3hjulee3gaa/console-log?task=:beam-sdks-python:testPython3]
> {code:java}
> 'second' not found in 'Traceback (most recent call last):\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py",
>  line 466, in test_error_traceback_includes_user_code\np | 
> beam.Create([0]) | beam.Map(first)  # pylint: 
> disable=expression-not-assigned\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/pipeline.py",
>  line 425, in __exit__\nself.run().wait_until_finish()\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/runners/portability/portable_runner.py",
>  line 314, in wait_until_finish\nself._job_id, self._state, 
> self._last_error_message()))\nRuntimeError: Pipeline 
> job-cdcefe6d-1caa-4487-9e63-e971f67ec68c failed in state FAILED: start 
>  coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>\n'{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6241) MongoDbIO - Add Limit and Aggregates Support

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6241?focusedWorklogId=183875=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183875
 ]

ASF GitHub Bot logged work on BEAM-6241:


Author: ASF GitHub Bot
Created on: 10/Jan/19 19:20
Start Date: 10/Jan/19 19:20
Worklog Time Spent: 10m 
  Work Description: sandboxws commented on issue #7293: [BEAM-6241] Added 
limit and aggregates support to MongoDbIO
URL: https://github.com/apache/beam/pull/7293#issuecomment-453219896
 
 
   @iemejia I see your point more clearly now, I'm sold. Will hack on this 
during the weekend. In the meantime, any good source on using 
`SerializableFunction` or should I just poke around the source code?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183875)
Time Spent: 1.5h  (was: 1h 20m)

> MongoDbIO - Add Limit and Aggregates Support
> 
>
> Key: BEAM-6241
> URL: https://issues.apache.org/jira/browse/BEAM-6241
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-mongodb
>Affects Versions: 2.9.0
>Reporter: Ahmed El.Hussaini
>Assignee: Ahmed El.Hussaini
>Priority: Major
>  Labels: easyfix
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> h2. Adds Support to Limit Results
>  
> {code:java}
> MongoDbIO.read()
> .withUri("mongodb://localhost:" + port)
> .withDatabase(DATABASE)
> .withCollection(COLLECTION)
> .withFilter("{\"scientist\":\"Einstein\"}")
> .withLimit(5));{code}
> h2. Adds Support to Use Aggregates
>  
> {code:java}
> List aggregates = new ArrayList();
>   aggregates.add(
> new BsonDocument(
>   "$match",
>   new BsonDocument("country", new BsonDocument("$eq", new 
> BsonString("England");
> PCollection output =
>   pipeline.apply(
> MongoDbIO.read()
>   .withUri("mongodb://localhost:" + port)
>   .withDatabase(DATABASE)
>   .withCollection(COLLECTION)
>   .withAggregate(aggregates));
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6248) Add Flink 1.7.x build target to Flink Runner

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6248?focusedWorklogId=183870=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183870
 ]

ASF GitHub Bot logged work on BEAM-6248:


Author: ASF GitHub Bot
Created on: 10/Jan/19 19:16
Start Date: 10/Jan/19 19:16
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #7300: [BEAM-6248] Add Flink 
v1.7 build target to Flink Runner 
URL: https://github.com/apache/beam/pull/7300#issuecomment-453218573
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183870)
Time Spent: 4h 20m  (was: 4h 10m)

> Add Flink 1.7.x build target to Flink Runner
> 
>
> Key: BEAM-6248
> URL: https://issues.apache.org/jira/browse/BEAM-6248
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> With BEAM-5419 we can add a Flink 1.7.x build target.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-6352) Watch PTransform is broken

2019-01-10 Thread Boyuan Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739685#comment-16739685
 ] 

Boyuan Zhang commented on BEAM-6352:


Just wanna follow up to make sure whether this issue is the blocker for current 
release(2.10.0)

> Watch PTransform is broken
> --
>
> Key: BEAM-6352
> URL: https://issues.apache.org/jira/browse/BEAM-6352
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.9.0
>Reporter: Gleb Kanterov
>Assignee: Boyuan Zhang
>Priority: Blocker
> Fix For: 2.10.0
>
>
> List of affected tests:
> org.apache.beam.sdk.transforms.WatchTest > 
> testSinglePollMultipleInputsWithSideInput FAILED
> org.apache.beam.sdk.transforms.WatchTest > testMultiplePollsWithKeyExtractor 
> FAILED
> org.apache.beam.sdk.transforms.WatchTest > testSinglePollMultipleInputs FAILED
> org.apache.beam.sdk.transforms.WatchTest > 
> testMultiplePollsWithTerminationDueToTerminationCondition FAILED
> org.apache.beam.sdk.transforms.WatchTest > testMultiplePollsWithManyResults 
> FAILED
> org.apache.beam.sdk.transforms.WatchTest > testSinglePollWithManyResults 
> FAILED
> org.apache.beam.sdk.transforms.WatchTest > 
> testMultiplePollsStopAfterTimeSinceNewOutput 
> org.apache.beam.sdk.transforms.WatchTest > 
> testMultiplePollsWithTerminationBecauseOutputIsFinal FAILED
> org.apache.beam.sdk.io.AvroIOTest$NeedsRunnerTests > 
> testContinuouslyWriteAndReadMultipleFilepatterns[0: true] FAILED
> org.apache.beam.sdk.io.AvroIOTest$NeedsRunnerTests > 
> testContinuouslyWriteAndReadMultipleFilepatterns[1: false] FAILED
> org.apache.beam.sdk.io.FileIOTest > testMatchWatchForNewFiles FAILED
> org.apache.beam.sdk.io.TextIOReadTest$BasicIOTest > testReadWatchForNewFiles 
> FAILED
> {code}
> java.lang.IllegalArgumentException: 
> org.apache.beam.sdk.transforms.Watch$WatchGrowthFn, @ProcessElement 
> process(ProcessContext, GrowthTracker): Has tracker type 
> Watch.GrowthTracker, but the DoFn's tracker 
> type must be of type RestrictionTracker.
> {code}
> Relevant pull requests:
> - https://github.com/apache/beam/pull/6467
> - https://github.com/apache/beam/pull/7374
> Now tests are marked with @Ignore referencing this JIRA issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-6355) [beam_PostRelease_NightlySnapshot] [runQuickstartJavaDataflow] ExceptionInInitializerError

2019-01-10 Thread Boyuan Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyuan Zhang resolved BEAM-6355.

   Resolution: Fixed
Fix Version/s: Not applicable

> [beam_PostRelease_NightlySnapshot] [runQuickstartJavaDataflow] 
> ExceptionInInitializerError
> --
>
> Key: BEAM-6355
> URL: https://issues.apache.org/jira/browse/BEAM-6355
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow, test-failures
>Reporter: Scott Wegner
>Assignee: Boyuan Zhang
>Priority: Major
>  Labels: currently-failing
> Fix For: Not applicable
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> _Use this form to file an issue for test failure:_
>  * [Jenkins 
> Job|https://builds.apache.org/job/beam_PostRelease_NightlySnapshot/480/]
>  * [Dataflow 
> Job|https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2019-01-03_03_07_12-9648275706628895259?project=apache-beam-testing=433637338589]
>  * [Test source 
> code|https://github.com/apache/beam/blob/master/release/src/main/groovy/quickstart-java-dataflow.groovy]
> Initial investigation:
> It appears the Dataflow job failed during worker initialization. From the 
> [stackdriver 
> logs|https://console.cloud.google.com/logs/viewer?resource=dataflow_step%2Fjob_id%2F2019-01-03_03_07_12-9648275706628895259=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fworker=NO_LIMIT=apache-beam-testing=433637338589=500=false=2019-01-03T16:52:14.64300Z==true=2019-01-03T11:08:49.88200Z],
>  I see:
> {code}
> 019-01-03 03:08:27.770 PST
> Uncaught exception occurred during work unit execution. This will be retried.
> Expand all | Collapse all {
>  insertId:  "3832125194122580497:879173:0:62501"  
>  jsonPayload: {
>   exception:  "java.lang.ExceptionInInitializerError
>   at 
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$2.typedApply(IntrinsicMapTaskExecutorFactory.java:344)
>   at 
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$2.typedApply(IntrinsicMapTaskExecutorFactory.java:338)
>   at 
> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
>   at 
> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
>   at 
> org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
>   at 
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:120)
>   at 
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:337)
>   at 
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:291)
>   at 
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
>   at 
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
>   at 
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalArgumentException: Multiple entries with same 
> key: 
> kind:varint=org.apache.beam.runners.dataflow.util.CloudObjectTranslators$8@f4dd50c
>  and 
> kind:varint=org.apache.beam.runners.dataflow.worker.RunnerHarnessCoderCloudObjectTranslatorRegistrar$1@ae1551d
>   at 
> org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.collect.ImmutableMap.checkNoConflict(ImmutableMap.java:136)
>   at 
> org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.collect.RegularImmutableMap.checkNoConflictInKeyBucket(RegularImmutableMap.java:100)
>   at 
> org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.collect.RegularImmutableMap.fromEntryArray(RegularImmutableMap.java:86)
>   at 
> org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.collect.ImmutableMap$Builder.build(ImmutableMap.java:300)
>   at 
> org.apache.beam.runners.dataflow.util.CloudObjects.populateCloudObjectTranslators(CloudObjects.java:60)
>   at 
> org.apache.beam.runners.dataflow.util.CloudObjects.(CloudObjects.java:39)
>   ... 15 more
> "   
>   job:  

[jira] [Work logged] (BEAM-6184) PortableRunner dependency missed in wordcount example maven artifact

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6184?focusedWorklogId=183842=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183842
 ]

ASF GitHub Bot logged work on BEAM-6184:


Author: ASF GitHub Bot
Created on: 10/Jan/19 18:21
Start Date: 10/Jan/19 18:21
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #7454: [BEAM-6184] Enforce 
javadoc check on non-trivial public methods
URL: https://github.com/apache/beam/pull/7454#issuecomment-453200155
 
 
   R: @robertwb  @kennknowles 
   
   CC: @swegner 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183842)
Time Spent: 1h  (was: 50m)

> PortableRunner dependency missed in wordcount example maven artifact
> 
>
> Key: BEAM-6184
> URL: https://issues.apache.org/jira/browse/BEAM-6184
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system
>Reporter: Ruoyun Huang
>Assignee: Ruoyun Huang
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
>  
>  
> more context: 
> https://lists.apache.org/thread.html/8dd60395424425f7502d62888c49014430d1d3b06c026606f3db28ab@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6326) Fix test failures in streaming mode of PortableValidatesRunner

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6326?focusedWorklogId=183840=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183840
 ]

ASF GitHub Bot logged work on BEAM-6326:


Author: ASF GitHub Bot
Created on: 10/Jan/19 18:15
Start Date: 10/Jan/19 18:15
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7453: [BEAM-6326] Fix portable 
stateful processing with side input
URL: https://github.com/apache/beam/pull/7453#issuecomment-453197956
 
 
   We need to backport this to release-2.10
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183840)
Time Spent: 3h 10m  (was: 3h)

> Fix test failures in streaming mode of PortableValidatesRunner
> --
>
> Key: BEAM-6326
> URL: https://issues.apache.org/jira/browse/BEAM-6326
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> As of BEAM-6009, the tests are run separately for batch and streaming. This 
> has revealed issues with a couple of tests which need to be addressed.
> The Gradle task is: {{validatesPortableRunnerStreaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6326) Fix test failures in streaming mode of PortableValidatesRunner

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6326?focusedWorklogId=183839=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183839
 ]

ASF GitHub Bot logged work on BEAM-6326:


Author: ASF GitHub Bot
Created on: 10/Jan/19 18:12
Start Date: 10/Jan/19 18:12
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #7453: [BEAM-6326] Fix 
portable stateful processing with side input
URL: https://github.com/apache/beam/pull/7453#discussion_r246866056
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -632,10 +635,38 @@ private void translateStreamingImpulse(
 if (transformedSideInputs.unionTagToView.isEmpty()) {
   outputStream = inputDataStream.transform(operatorName, 
outputTypeInformation, doFnOperator);
 } else {
-  outputStream =
-  inputDataStream
-  .connect(transformedSideInputs.unionedSideInputs.broadcast())
-  .transform(operatorName, outputTypeInformation, doFnOperator);
+  DataStream sideInputStream =
+  transformedSideInputs.unionedSideInputs.broadcast();
+  if (stateful) {
+// We have to manually construct the two-input transform because we're 
not
+// allowed to have only one input keyed, normally. Since Flink 1.5.0 
it's
+// possible to use the Broadcast State Pattern which provides a more 
elegant
 
 Review comment:
   I don't think it is worth the effort. We have all the logic already in place 
and we wouldn't gain anything from using the Broadcast State Pattern. Good 
point on keyed side inputs, the current approach is generally much more 
flexible. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183839)
Time Spent: 3h  (was: 2h 50m)

> Fix test failures in streaming mode of PortableValidatesRunner
> --
>
> Key: BEAM-6326
> URL: https://issues.apache.org/jira/browse/BEAM-6326
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> As of BEAM-6009, the tests are run separately for batch and streaming. This 
> has revealed issues with a couple of tests which need to be addressed.
> The Gradle task is: {{validatesPortableRunnerStreaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183838=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183838
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 18:11
Start Date: 10/Jan/19 18:11
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246865795
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_test.py
 ##
 @@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unittest for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import datetime
+import logging
+import random
+import string
+import unittest
+import uuid
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigtable_io_write import BigtableWriteConfiguration
+from apache_beam.io.gcp.bigtable_io_write import WriteToBigtable
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# Protect against environments where bigtable library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud.bigtable import row, column_family, Client
+except ImportError:
+  Client = None
+
+
+class GenerateDirectRows(beam.DoFn):
+  """ Generates an iterator of DirectRow object to process on beam pipeline.
+
+  """
+  def process(self, row_values):
+""" Process beam pipeline using an element.
+
+:type row_value: dict
+:param row_value: dict: dict values with row_key and row_content having
+family, column_id and value of row.
+"""
+direct_row = row.DirectRow(row_key=row_values["row_key"])
+
+for row_value in row_values["row_content"]:
+  direct_row.set_cell(row_value["column_family_id"],
+  row_value["column_id"],
+  row_value["value"],
+  datetime.datetime.now())
+
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOWriteIT(unittest.TestCase):
+  """ Bigtable Write Connector Test
+
+  """
+  DEFAULT_TABLE_PREFIX = "python-test"
+  instance_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  cluster_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  table_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  number = 500
+  LOCATION_ID = "us-east1-b"
+
+  def setUp(self):
+try:
+  from google.cloud.bigtable import enums
+  self.STORAGE_TYPE = enums.StorageType.HDD
+except ImportError:
+  self.STORAGE_TYPE = 2
+
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.runner_name = type(self.test_pipeline.runner).__name__
+self.project = self.test_pipeline.get_option('project')
+self.client = Client(project=self.project, admin=True)
+self._create_instance_table()
+
+  def tearDown(self):
+instance = self.client.instance(self.instance_id)
+if instance.exists():
+  instance.delete()
+
+  def test_bigtable_write_python(self):
+number = self.number
+config = BigtableWriteConfiguration(self.project, self.instance_id,
+self.table_id)
+pipeline_args = self.test_pipeline.options_list
+pipeline_options = PipelineOptions(pipeline_args)
+
+row_values = self._generate_mutation_data(number)
+
+with beam.Pipeline(options=pipeline_options) as pipeline:
+  _ = (
+  pipeline
+  | 'Generate Row Values' >> beam.Create(row_values)
+  | 'Generate Direct Rows' >> beam.ParDo(GenerateDirectRows())
+  | 'Write to BT' >> beam.ParDo(WriteToBigtable(config)))
+
+  result = pipeline.run()
+  result.wait_until_finish()
+
+  assert result.state == PipelineState.DONE
+
+  if not hasattr(result, 'has_job') or result.has_job:
+read_filter = MetricsFilter().with_name('Written Row')
+   

[jira] [Work logged] (BEAM-6280) Failure in PortableRunnerTest.test_error_traceback_includes_user_code

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6280?focusedWorklogId=183828=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183828
 ]

ASF GitHub Bot logged work on BEAM-6280:


Author: ASF GitHub Bot
Created on: 10/Jan/19 17:43
Start Date: 10/Jan/19 17:43
Worklog Time Spent: 10m 
  Work Description: rohdesamuel commented on pull request #7433: 
[BEAM-6280] Refactors Python portability tests to be multi-threaded aware
URL: https://github.com/apache/beam/pull/7433#discussion_r246856679
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/local_job_service.py
 ##
 @@ -185,55 +152,89 @@ class BeamJob(threading.Thread):
 The current state of the pipeline is available as self.state.
 """
 
-  def __init__(self,
-   job_id,
-   pipeline_options,
-   pipeline_proto):
+  def __init__(self, pipeline_proto):
 super(BeamJob, self).__init__()
-self._job_id = job_id
-self._pipeline_options = pipeline_options
 self._pipeline_proto = pipeline_proto
 self._state = None
-self._state_change_callbacks = []
-self._last_log_message = None
-self._log_callbacks = [lambda msg: setattr(self, '_last_log_message', msg)]
+self._logs = []
 
 Review comment:
   Since the user can start the stream at any time and we don't store all the 
logs, this means that the there would be no guarantee that the user seems their 
intended message. We need to store all messages and send all of them to each 
message stream.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183828)
Time Spent: 1h 50m  (was: 1h 40m)

> Failure in PortableRunnerTest.test_error_traceback_includes_user_code
> -
>
> Key: BEAM-6280
> URL: https://issues.apache.org/jira/browse/BEAM-6280
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core, test-failures
>Reporter: Kenneth Knowles
>Assignee: Sam Rohde
>Priority: Critical
>  Labels: flaky-test
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/]
> [https://builds.apache.org/job/beam_PreCommit_Python_Cron/732/testReport/apache_beam.runners.portability.portable_runner_test/PortableRunnerTest/test_error_traceback_includes_user_code/]
> [https://scans.gradle.com/s/do3hjulee3gaa/console-log?task=:beam-sdks-python:testPython3]
> {code:java}
> 'second' not found in 'Traceback (most recent call last):\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py",
>  line 466, in test_error_traceback_includes_user_code\np | 
> beam.Create([0]) | beam.Map(first)  # pylint: 
> disable=expression-not-assigned\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/pipeline.py",
>  line 425, in __exit__\nself.run().wait_until_finish()\n  File 
> "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/apache_beam/runners/portability/portable_runner.py",
>  line 314, in wait_until_finish\nself._job_id, self._state, 
> self._last_error_message()))\nRuntimeError: Pipeline 
> job-cdcefe6d-1caa-4487-9e63-e971f67ec68c failed in state FAILED: start 
>  coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>\n'{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6326) Fix test failures in streaming mode of PortableValidatesRunner

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6326?focusedWorklogId=183821=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183821
 ]

ASF GitHub Bot logged work on BEAM-6326:


Author: ASF GitHub Bot
Created on: 10/Jan/19 17:25
Start Date: 10/Jan/19 17:25
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #7453: [BEAM-6326] Fix 
portable stateful processing with side input
URL: https://github.com/apache/beam/pull/7453
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183821)
Time Spent: 2h 50m  (was: 2h 40m)

> Fix test failures in streaming mode of PortableValidatesRunner
> --
>
> Key: BEAM-6326
> URL: https://issues.apache.org/jira/browse/BEAM-6326
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> As of BEAM-6009, the tests are run separately for batch and streaming. This 
> has revealed issues with a couple of tests which need to be addressed.
> The Gradle task is: {{validatesPortableRunnerStreaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6326) Fix test failures in streaming mode of PortableValidatesRunner

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6326?focusedWorklogId=183820=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183820
 ]

ASF GitHub Bot logged work on BEAM-6326:


Author: ASF GitHub Bot
Created on: 10/Jan/19 17:24
Start Date: 10/Jan/19 17:24
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #7453: [BEAM-6326] Fix 
portable stateful processing with side input
URL: https://github.com/apache/beam/pull/7453#discussion_r246850175
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 ##
 @@ -632,10 +635,38 @@ private void translateStreamingImpulse(
 if (transformedSideInputs.unionTagToView.isEmpty()) {
   outputStream = inputDataStream.transform(operatorName, 
outputTypeInformation, doFnOperator);
 } else {
-  outputStream =
-  inputDataStream
-  .connect(transformedSideInputs.unionedSideInputs.broadcast())
-  .transform(operatorName, outputTypeInformation, doFnOperator);
+  DataStream sideInputStream =
+  transformedSideInputs.unionedSideInputs.broadcast();
+  if (stateful) {
+// We have to manually construct the two-input transform because we're 
not
+// allowed to have only one input keyed, normally. Since Flink 1.5.0 
it's
+// possible to use the Broadcast State Pattern which provides a more 
elegant
 
 Review comment:
   Do you think it is worth changing the DoFnOperator to use broadcast state? 
If so, then perhaps create a ticket for it. On the other hand, if in the future 
Beam was to support keyed side inputs, then this would be more flexible.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183820)
Time Spent: 2h 40m  (was: 2.5h)

> Fix test failures in streaming mode of PortableValidatesRunner
> --
>
> Key: BEAM-6326
> URL: https://issues.apache.org/jira/browse/BEAM-6326
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> As of BEAM-6009, the tests are run separately for batch and streaming. This 
> has revealed issues with a couple of tests which need to be addressed.
> The Gradle task is: {{validatesPortableRunnerStreaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=183816=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183816
 ]

ASF GitHub Bot logged work on BEAM-6392:


Author: ASF GitHub Bot
Created on: 10/Jan/19 17:14
Start Date: 10/Jan/19 17:14
Worklog Time Spent: 10m 
  Work Description: kanterov commented on issue #7441: [BEAM-6392] Add 
support for the BigQuery read API to BigQueryIO.
URL: https://github.com/apache/beam/pull/7441#issuecomment-453176331
 
 
   @kmjung this is awesome, I've been waiting for this feature. Is there any 
documentation for BigQuery API for direct reads? Couldn't find any.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183816)
Time Spent: 40m  (was: 0.5h)

> Add support for new BigQuery streaming read API to BigQueryIO
> -
>
> Key: BEAM-6392
> URL: https://issues.apache.org/jira/browse/BEAM-6392
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Kenneth Jung
>Assignee: Kenneth Jung
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> BigQuery has developed a new streaming egress API which will soon reach 
> public availability. Add support for the new API in BigQueryIO.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6392) Add support for new BigQuery streaming read API to BigQueryIO

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6392?focusedWorklogId=183815=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183815
 ]

ASF GitHub Bot logged work on BEAM-6392:


Author: ASF GitHub Bot
Created on: 10/Jan/19 17:12
Start Date: 10/Jan/19 17:12
Worklog Time Spent: 10m 
  Work Description: kanterov commented on issue #7441: [BEAM-6392] Add 
support for the BigQuery read API to BigQueryIO.
URL: https://github.com/apache/beam/pull/7441#issuecomment-453176331
 
 
   @kmjung this is awesome, I've been waiting for this feature. Is there any 
documentation for BigQuery API for direct reads? Couldn't find one.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183815)
Time Spent: 0.5h  (was: 20m)

> Add support for new BigQuery streaming read API to BigQueryIO
> -
>
> Key: BEAM-6392
> URL: https://issues.apache.org/jira/browse/BEAM-6392
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Kenneth Jung
>Assignee: Kenneth Jung
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> BigQuery has developed a new streaming egress API which will soon reach 
> public availability. Add support for the new API in BigQueryIO.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183811=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183811
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 17:05
Start Date: 10/Jan/19 17:05
Worklog Time Spent: 10m 
  Work Description: juan-rael commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246843576
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_test.py
 ##
 @@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unittest for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import datetime
+import logging
+import random
+import string
+import unittest
+import uuid
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigtable_io_write import BigtableWriteConfiguration
+from apache_beam.io.gcp.bigtable_io_write import WriteToBigtable
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# Protect against environments where bigtable library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud.bigtable import row, column_family, Client
+except ImportError:
+  Client = None
+
+
+class GenerateDirectRows(beam.DoFn):
+  """ Generates an iterator of DirectRow object to process on beam pipeline.
+
+  """
+  def process(self, row_values):
+""" Process beam pipeline using an element.
+
+:type row_value: dict
+:param row_value: dict: dict values with row_key and row_content having
+family, column_id and value of row.
+"""
+direct_row = row.DirectRow(row_key=row_values["row_key"])
+
+for row_value in row_values["row_content"]:
+  direct_row.set_cell(row_value["column_family_id"],
+  row_value["column_id"],
+  row_value["value"],
+  datetime.datetime.now())
+
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOWriteIT(unittest.TestCase):
+  """ Bigtable Write Connector Test
+
+  """
+  DEFAULT_TABLE_PREFIX = "python-test"
+  instance_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  cluster_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  table_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  number = 500
+  LOCATION_ID = "us-east1-b"
+
+  def setUp(self):
+try:
+  from google.cloud.bigtable import enums
+  self.STORAGE_TYPE = enums.StorageType.HDD
+except ImportError:
+  self.STORAGE_TYPE = 2
+
+self.test_pipeline = TestPipeline(is_integration_test=True)
+self.runner_name = type(self.test_pipeline.runner).__name__
+self.project = self.test_pipeline.get_option('project')
+self.client = Client(project=self.project, admin=True)
+self._create_instance_table()
+
+  def tearDown(self):
+instance = self.client.instance(self.instance_id)
+if instance.exists():
+  instance.delete()
+
+  def test_bigtable_write_python(self):
+number = self.number
+config = BigtableWriteConfiguration(self.project, self.instance_id,
+self.table_id)
+pipeline_args = self.test_pipeline.options_list
+pipeline_options = PipelineOptions(pipeline_args)
+
+row_values = self._generate_mutation_data(number)
+
+with beam.Pipeline(options=pipeline_options) as pipeline:
+  _ = (
+  pipeline
+  | 'Generate Row Values' >> beam.Create(row_values)
+  | 'Generate Direct Rows' >> beam.ParDo(GenerateDirectRows())
+  | 'Write to BT' >> beam.ParDo(WriteToBigtable(config)))
+
+  result = pipeline.run()
+  result.wait_until_finish()
+
+  assert result.state == PipelineState.DONE
+
+  if not hasattr(result, 'has_job') or result.has_job:
+read_filter = MetricsFilter().with_name('Written Row')
+ 

[jira] [Work logged] (BEAM-6405) Improve PortableValidatesRunner test reliability on Jenkins

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6405?focusedWorklogId=183805=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183805
 ]

ASF GitHub Bot logged work on BEAM-6405:


Author: ASF GitHub Bot
Created on: 10/Jan/19 16:56
Start Date: 10/Jan/19 16:56
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #7459: [BEAM-6405] Reduce 
parallelism of PortableValidatesRunner tests
URL: https://github.com/apache/beam/pull/7459#issuecomment-453170056
 
 
   Nice! We had the same issue with the Python VR tests some time ago; this 
should cut down resource consumption significantly.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183805)
Time Spent: 1h 20m  (was: 1h 10m)

> Improve PortableValidatesRunner test reliability on Jenkins
> ---
>
> Key: BEAM-6405
> URL: https://issues.apache.org/jira/browse/BEAM-6405
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The PVR tests seem to be passing fine and then failing consecutively for no 
> reason: https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/ 
> It looks like the outrageous parallelism, i.e. number of available cores, is 
> responsible for the flakiness if there is additional load on the build 
> slaves. We should lower the parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6405) Improve PortableValidatesRunner test reliability on Jenkins

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6405?focusedWorklogId=183804=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183804
 ]

ASF GitHub Bot logged work on BEAM-6405:


Author: ASF GitHub Bot
Created on: 10/Jan/19 16:54
Start Date: 10/Jan/19 16:54
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #7459: [BEAM-6405] 
Reduce parallelism of PortableValidatesRunner tests
URL: https://github.com/apache/beam/pull/7459
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183804)
Time Spent: 1h 10m  (was: 1h)

> Improve PortableValidatesRunner test reliability on Jenkins
> ---
>
> Key: BEAM-6405
> URL: https://issues.apache.org/jira/browse/BEAM-6405
> Project: Beam
>  Issue Type: Test
>  Components: runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The PVR tests seem to be passing fine and then failing consecutively for no 
> reason: https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/ 
> It looks like the outrageous parallelism, i.e. number of available cores, is 
> responsible for the flakiness if there is additional load on the build 
> slaves. We should lower the parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183800=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183800
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 16:44
Start Date: 10/Jan/19 16:44
Worklog Time Spent: 10m 
  Work Description: juan-rael commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246835084
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_write.py
 ##
 @@ -0,0 +1,171 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""BigTable connector
+
+This module implements writing to BigTable tables.
+The default mode is to set row data to write to BigTable tables.
+The syntax supported is described here:
+https://cloud.google.com/bigtable/docs/quickstart-cbt
+
+BigTable connector can be used as main outputs. A main output
+(common case) is expected to be massive and will be split into
+manageable chunks and processed in parallel. In the example below
+we created a list of rows then passed to the GeneratedDirectRows
+DoFn to set the Cells and then we call the WriteToBigtable to insert
+those generated rows in the table.
+
+  main_table = (p
+   | 'Generate Row Values' >> beam.Create(row_values)
+   | 'Generate Direct Rows' >> beam.ParDo(GenerateDirectRows())
+   | 'Write to BT' >> beam.ParDo(WriteToBigtable(beam_options)))
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.metrics import Metrics
+from apache_beam.transforms.display import DisplayDataItem
+
+try:
+  from google.cloud.bigtable import Client
+  from google.cloud.bigtable.batcher import MutationsBatcher
+except ImportError:
+  pass
+
+
+class WriteToBigtable(beam.DoFn):
+  """ Creates the connector can call and add_row to the batcher using each
+  row in beam pipe line
+
+  :type beam_options: class:`~bigtable_configuration.BigtableConfiguration`
+  :param beam_options: class `~bigtable_configuration.BigtableConfiguration`
+  """
+
+  def __init__(self, beam_options):
+super(WriteToBigtable, self).__init__(beam_options)
+self.beam_options = beam_options
+self.client = None
+self.instance = None
+self.table = None
+self.batcher = None
+self._app_profile_id = self.beam_options.app_profile_id
+self.flush_count = self.beam_options.flush_count
+self.max_row_bytes = self.beam_options.max_row_bytes
+self.written = Metrics.counter(self.__class__, 'Written Row')
+
+  def start_bundle(self):
+if self.client is None:
+  if self.beam_options.credentials is None:
+self.client = Client(project=self.beam_options.project_id,
+ admin=True)
+  else:
+self.client = Client(project=self.beam_options.project_id,
+ credentials=self.beam_options.credentials,
+ admin=True)
+if self.instance is None:
+  self.instance = self.client.instance(self.beam_options.instance_id)
+if self.table is None:
+  self.table = self.instance.table(self.beam_options.table_id,
+   self._app_profile_id)
+
+self.batcher = MutationsBatcher(self.table, flush_count=self.flush_count,
+max_row_bytes=self.max_row_bytes)
+
+  def process(self, row):
+self.written.inc()
+self.batcher.mutate(row)
+
+  def finish_bundle(self):
+return self.batcher.flush()
+
+  def display_data(self):
+return {'projectId': DisplayDataItem(self.beam_options.project_id,
+ label='Bigtable Project Id'),
+'instanceId': DisplayDataItem(self.beam_options.instance_id,
+  label='Bigtable Instance Id'),
+'tableId': DisplayDataItem(self.beam_options.table_id,
+   label='Bigtable Table Id'),
+'bigtableOptions': DisplayDataItem(str(self.beam_options),
+   label='Bigtable 

[jira] [Commented] (BEAM-5676) Allow to change the PubsubClientFactory when using PubsubIO

2019-01-10 Thread Logan HAUSPIE (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739597#comment-16739597
 ] 

Logan HAUSPIE commented on BEAM-5676:
-

I will do it next month if no one else has done it before me

> Allow to change the PubsubClientFactory when using PubsubIO
> ---
>
> Key: BEAM-5676
> URL: https://issues.apache.org/jira/browse/BEAM-5676
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Affects Versions: 2.7.0
>Reporter: Logan HAUSPIE
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When we use PubSub to push or pull messages, we currently have no choice of 
> serialization implementation because PubsubIO use internaly the 
> *_PubsubJsonClient.FACTORY_* of type _*PubsubJsonClientFactory*_ to serialize 
> or deserialize messages.
> It should be nice to be able to set a different factory (e.g. 
> *_PubsubGrpcClientFactory_*) to decrease the size of messages (and then 
> decrease the price of using Pubsub).
>  
> I guess It could be possible to write something like:
> {{PubsubIO.Write write =}}
>  {{    PubsubIO.writeMessages()}}
>  {{            .to("projects/project/topics/topic")}}
>  {{            *.withFactory(PubsubGrpcClient.FACTORY)*}}
>  {{            .withTimestampAttribute("timestamp")}}{{;}}
> or 
> {{PubsubIO.Read read = }}
>  {{    PubsubIO.readMessages()}}
>  {{            .fromSubscription("projects/project/subscriptions/name")}}
>  {{            *.withFactory(PubsubGrpcClient.FACTORY)*}}
>  {{            .withTimestampAttribute("timestamp");}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-5614) Using gs:// paths without first doing a "gcloud auth" gives an unhelpful error message

2019-01-10 Thread Chamikara Jayalath (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-5614:


Assignee: (was: Chamikara Jayalath)

> Using gs:// paths without first doing a "gcloud auth" gives an unhelpful 
> error message
> --
>
> Key: BEAM-5614
> URL: https://issues.apache.org/jira/browse/BEAM-5614
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Udi Meiri
>Priority: Major
>
> Users see an error like:
> java.lang.IllegalArgumentException: Error constructing default value for 
> gcpTempLocation: tempLocation is not a valid GCS path, gs://bucket/path/.
> Also reported here: 
> https://stackoverflow.com/questions/43026371/apache-beam-minimalwordcount-example-with-dataflow-runner-on-eclipse



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-5420) BigtableIO tries to get runtime parameters when collecting display data at pipeline construction time

2019-01-10 Thread Chamikara Jayalath (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-5420:


Assignee: Pablo Estrada  (was: Chamikara Jayalath)

> BigtableIO tries to get runtime parameters when collecting display data at 
> pipeline construction time
> -
>
> Key: BEAM-5420
> URL: https://issues.apache.org/jira/browse/BEAM-5420
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Kevin Si
>Assignee: Pablo Estrada
>Priority: Minor
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> For example: 
> [https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java#L165]
> At Dataflow pipeline construction time calling getProjectId() gives an error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable Python connector

2019-01-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=183786=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-183786
 ]

ASF GitHub Bot logged work on BEAM-3342:


Author: ASF GitHub Bot
Created on: 10/Jan/19 16:25
Start Date: 10/Jan/19 16:25
Worklog Time Spent: 10m 
  Work Description: sduskis commented on pull request #7367: [BEAM-3342] 
Create a Cloud Bigtable Python connector Write
URL: https://github.com/apache/beam/pull/7367#discussion_r246822610
 
 

 ##
 File path: sdks/python/apache_beam/io/gcp/bigtable_io_write.py
 ##
 @@ -0,0 +1,171 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""BigTable connector
+
+This module implements writing to BigTable tables.
+The default mode is to set row data to write to BigTable tables.
+The syntax supported is described here:
+https://cloud.google.com/bigtable/docs/quickstart-cbt
+
+BigTable connector can be used as main outputs. A main output
+(common case) is expected to be massive and will be split into
+manageable chunks and processed in parallel. In the example below
+we created a list of rows then passed to the GeneratedDirectRows
+DoFn to set the Cells and then we call the WriteToBigtable to insert
+those generated rows in the table.
+
+  main_table = (p
+   | 'Generate Row Values' >> beam.Create(row_values)
+   | 'Generate Direct Rows' >> beam.ParDo(GenerateDirectRows())
+   | 'Write to BT' >> beam.ParDo(WriteToBigtable(beam_options)))
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.metrics import Metrics
+from apache_beam.transforms.display import DisplayDataItem
+
+try:
+  from google.cloud.bigtable import Client
+  from google.cloud.bigtable.batcher import MutationsBatcher
+except ImportError:
+  pass
+
+
+class WriteToBigtable(beam.DoFn):
+  """ Creates the connector can call and add_row to the batcher using each
+  row in beam pipe line
+
+  :type beam_options: class:`~bigtable_configuration.BigtableConfiguration`
+  :param beam_options: class `~bigtable_configuration.BigtableConfiguration`
+  """
+
+  def __init__(self, beam_options):
+super(WriteToBigtable, self).__init__(beam_options)
+self.beam_options = beam_options
+self.client = None
+self.instance = None
+self.table = None
+self.batcher = None
+self._app_profile_id = self.beam_options.app_profile_id
 
 Review comment:
   @juan-rael, please don't create class variables for `beam_options`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 183786)

> Create a Cloud Bigtable Python connector
> 
>
> Key: BEAM-3342
> URL: https://issues.apache.org/jira/browse/BEAM-3342
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4904) Beam Dependency Update Request: de.flapdoodle.embed:de.flapdoodle.embed.mongo 2.1.1

2019-01-10 Thread Chamikara Jayalath (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-4904:


Assignee: (was: Chamikara Jayalath)

> Beam Dependency Update Request: de.flapdoodle.embed:de.flapdoodle.embed.mongo 
> 2.1.1
> ---
>
> Key: BEAM-4904
> URL: https://issues.apache.org/jira/browse/BEAM-4904
> Project: Beam
>  Issue Type: Sub-task
>  Components: dependencies
>Reporter: Beam JIRA Bot
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> 2018-07-25 20:23:49.911490
> Please review and upgrade the 
> de.flapdoodle.embed:de.flapdoodle.embed.mongo to the latest version 2.1.1 
>  
> cc: 
> 2018-08-06 12:09:30.976479
> Please review and upgrade the 
> de.flapdoodle.embed:de.flapdoodle.embed.mongo to the latest version 2.1.1 
>  
> cc: 
> 2018-08-13 12:09:48.188897
> Please review and upgrade the 
> de.flapdoodle.embed:de.flapdoodle.embed.mongo to the latest version 2.1.1 
>  
> cc: 
> 2018-08-20 12:12:32.344889
> Please review and upgrade the 
> de.flapdoodle.embed:de.flapdoodle.embed.mongo to the latest version 2.1.1 
>  
> cc: 
> 2018-08-27 12:14:00.846640
> Please review and upgrade the 
> de.flapdoodle.embed:de.flapdoodle.embed.mongo to the latest version 2.1.1 
>  
> cc: 
> 2018-09-03 12:26:28.154799
> Please review and upgrade the 
> de.flapdoodle.embed:de.flapdoodle.embed.mongo to the latest version 2.1.1 
>  
> cc: 
> 2018-09-10 12:15:38.875437
> Please review and upgrade the 
> de.flapdoodle.embed:de.flapdoodle.embed.mongo to the latest version 2.1.1 
>  
> cc: 
> 2018-09-17 12:18:29.083517
> Please review and upgrade the 
> de.flapdoodle.embed:de.flapdoodle.embed.mongo to the latest version 2.1.1 
>  
> cc: 
> 2018-09-24 12:25:56.575239
> Please review and upgrade the 
> de.flapdoodle.embed:de.flapdoodle.embed.mongo to the latest version 2.1.1 
>  
> cc: 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5417) FileSystems.match behaviour diff between GCS and local file system

2019-01-10 Thread Chamikara Jayalath (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739591#comment-16739591
 ] 

Chamikara Jayalath commented on BEAM-5417:
--

Joar, did you try with the latest version of Beam ?

> FileSystems.match behaviour diff between GCS and local file system
> --
>
> Key: BEAM-5417
> URL: https://issues.apache.org/jira/browse/BEAM-5417
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.5.0, 2.6.0
>Reporter: Joar Wandborg
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> Given the directory structure:
>  
> {noformat}
> .
> ├── filesystem-match-test
> │   ├── a
> │   │   └── file.txt
> │   └── b
> │   └── file.txt
> └── filesystem-match-test.py
> {noformat}
>  
> Where {{filesystem-match-test.py}} contains:
> {code:python}
> from __future__ import print_function
> import os
> import posixpath
> from apache_beam.io.filesystem import MatchResult
> from apache_beam.io.filesystems import FileSystems
> BASES = [
> os.path.join(os.path.dirname(__file__), "./"),
> "gs://my-bucket/test/",
> ]
> pattern = "filesystem-match-test/*/file.txt"
> for base_path in BASES:
> full_pattern = posixpath.join(base_path, pattern)
> print("full_pattern: {}".format(full_pattern))
> match_result = FileSystems.match([full_pattern])[0]  # type: MatchResult
> print("metadata list: {}".format(match_result.metadata_list))
> {code}
> Running {{python filesystem-match-test.py}} does not match any files locally, 
> but does match files on GCS:
> {noformat}
> full_pattern: ./filesystem-match-test/*/file.txt
> metadata list: []
> full_pattern: gs://my-bucket/test/filesystem-match-test/*/file.txt
> metadata list: 
> [FileMetadata(gs://my-bucket/test/filesystem-match-test/a/file.txt, 6), 
> FileMetadata(gs://my-bucket/test/filesystem-match-test/b/file.txt, 6)]
> {noformat}
> The expected result is that a/file.txt and b/file.txt should be matched for 
> both patterns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-5934) FileSink affected by S3 eventual consistency

2019-01-10 Thread Chamikara Jayalath (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-5934:


Assignee: (was: Chamikara Jayalath)

> FileSink affected by S3 eventual consistency
> 
>
> Key: BEAM-5934
> URL: https://issues.apache.org/jira/browse/BEAM-5934
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-hadoop
>Affects Versions: 2.7.0
>Reporter: Pawel Bartoszek
>Priority: Major
>
> After upgrading to BEAM 2.7.0 and Flink 1.5.2 from BEAM 2.2.0) every day few 
> times a day job throws _No such file or directory_ exception when trying to 
> move temp bundle to the final location.
> After digging into the code it looks like it's kind of S3 eventual 
> consistency problem. Where the HEAD request, used to check if the temporary 
> file exists before copying it to final location, returns 404 and the whole 
> copy operation fails.
> We use sharded writes(32). Job output arounds 256 files a minute. But the 
> exception is thrown max 3 times a day - which suggest that there is some race 
> condition somewhere.
>  
> The case where S3 enforces eventual consistency is where the check if the 
> file exist is being made before uploading the file. I checked the BEAM 
> FileSink and couldn't find any logic that pre-check if the temp bundle file 
> exists. 
>  
> {code:java}
> Amazon S3 provides read-after-write consistency for PUTS of new objects in 
> your S3 bucket in all regions with one caveat. The caveat is that if you make 
> a HEAD or GET request to the key name (to find if the object exists) before 
> creating the object, Amazon S3 provides eventual consistency for 
> read-after-write.{code}
>  
> The logs from the job
> {code:java}
> 2018-10-29 17:45:03,873 INFO org.apache.beam.sdk.io.WriteFiles - Opening 
> writer f990d5a0-d5a8-4ce2-adee-baa01e294ae4 for window 
> [2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z) pane 
> PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0} destination 
> null
> 2018-10-29 17:45:04,043 INFO org.apache.beam.sdk.io.FileBasedSink$Writer - 
> Successfully wrote temporary file 
> s3a:/XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
> 2018-10-29 17:45:05,437 INFO org.apache.beam.sdk.io.FileBasedSink - Will copy 
> temporary file FileResult{tempFilename=s3a://XXX/beam/.temp-beam-2018-
> 10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4, shard=9, 
> window=[2018-10-29T17:43:00.000Z..2018-10-29T17:44:00.000Z), 
> paneInfo=PaneInfo{isFirst=true, timing=ON_TIME, index=0, onTimeIndex=0}} to 
> final location s3a://XXX/rdot-17:43-17:44-pane-0-on_time-first-9.gz{code}
>  
> {code:java}
> Caused by: org.apache.beam.sdk.util.UserCodeException: 
> java.io.FileNotFoundException: No such file or directory: 
> s3a://XXX/beam/.temp-beam-2018-10-29_13-56-20-1/f990d5a0-d5a8-4ce2-adee-baa01e294ae4
> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
> at 
> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:812)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:789)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:309)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:77)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:621)
> at 
> 

[jira] [Assigned] (BEAM-5676) Allow to change the PubsubClientFactory when using PubsubIO

2019-01-10 Thread Chamikara Jayalath (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-5676:


Assignee: (was: Chamikara Jayalath)

> Allow to change the PubsubClientFactory when using PubsubIO
> ---
>
> Key: BEAM-5676
> URL: https://issues.apache.org/jira/browse/BEAM-5676
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Affects Versions: 2.7.0
>Reporter: Logan HAUSPIE
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When we use PubSub to push or pull messages, we currently have no choice of 
> serialization implementation because PubsubIO use internaly the 
> *_PubsubJsonClient.FACTORY_* of type _*PubsubJsonClientFactory*_ to serialize 
> or deserialize messages.
> It should be nice to be able to set a different factory (e.g. 
> *_PubsubGrpcClientFactory_*) to decrease the size of messages (and then 
> decrease the price of using Pubsub).
>  
> I guess It could be possible to write something like:
> {{PubsubIO.Write write =}}
>  {{    PubsubIO.writeMessages()}}
>  {{            .to("projects/project/topics/topic")}}
>  {{            *.withFactory(PubsubGrpcClient.FACTORY)*}}
>  {{            .withTimestampAttribute("timestamp")}}{{;}}
> or 
> {{PubsubIO.Read read = }}
>  {{    PubsubIO.readMessages()}}
>  {{            .fromSubscription("projects/project/subscriptions/name")}}
>  {{            *.withFactory(PubsubGrpcClient.FACTORY)*}}
>  {{            .withTimestampAttribute("timestamp");}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-5417) FileSystems.match behaviour diff between GCS and local file system

2019-01-10 Thread Chamikara Jayalath (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-5417:


Assignee: Udi Meiri  (was: Chamikara Jayalath)

> FileSystems.match behaviour diff between GCS and local file system
> --
>
> Key: BEAM-5417
> URL: https://issues.apache.org/jira/browse/BEAM-5417
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.5.0, 2.6.0
>Reporter: Joar Wandborg
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> Given the directory structure:
>  
> {noformat}
> .
> ├── filesystem-match-test
> │   ├── a
> │   │   └── file.txt
> │   └── b
> │   └── file.txt
> └── filesystem-match-test.py
> {noformat}
>  
> Where {{filesystem-match-test.py}} contains:
> {code:python}
> from __future__ import print_function
> import os
> import posixpath
> from apache_beam.io.filesystem import MatchResult
> from apache_beam.io.filesystems import FileSystems
> BASES = [
> os.path.join(os.path.dirname(__file__), "./"),
> "gs://my-bucket/test/",
> ]
> pattern = "filesystem-match-test/*/file.txt"
> for base_path in BASES:
> full_pattern = posixpath.join(base_path, pattern)
> print("full_pattern: {}".format(full_pattern))
> match_result = FileSystems.match([full_pattern])[0]  # type: MatchResult
> print("metadata list: {}".format(match_result.metadata_list))
> {code}
> Running {{python filesystem-match-test.py}} does not match any files locally, 
> but does match files on GCS:
> {noformat}
> full_pattern: ./filesystem-match-test/*/file.txt
> metadata list: []
> full_pattern: gs://my-bucket/test/filesystem-match-test/*/file.txt
> metadata list: 
> [FileMetadata(gs://my-bucket/test/filesystem-match-test/a/file.txt, 6), 
> FileMetadata(gs://my-bucket/test/filesystem-match-test/b/file.txt, 6)]
> {noformat}
> The expected result is that a/file.txt and b/file.txt should be matched for 
> both patterns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5676) Allow to change the PubsubClientFactory when using PubsubIO

2019-01-10 Thread Chamikara Jayalath (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739589#comment-16739589
 ] 

Chamikara Jayalath commented on BEAM-5676:
--

Logan, please feel free to send a pull request if you have time.

 

cc: [~reuvenlax] [~mil...@google.com]

> Allow to change the PubsubClientFactory when using PubsubIO
> ---
>
> Key: BEAM-5676
> URL: https://issues.apache.org/jira/browse/BEAM-5676
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Affects Versions: 2.7.0
>Reporter: Logan HAUSPIE
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When we use PubSub to push or pull messages, we currently have no choice of 
> serialization implementation because PubsubIO use internaly the 
> *_PubsubJsonClient.FACTORY_* of type _*PubsubJsonClientFactory*_ to serialize 
> or deserialize messages.
> It should be nice to be able to set a different factory (e.g. 
> *_PubsubGrpcClientFactory_*) to decrease the size of messages (and then 
> decrease the price of using Pubsub).
>  
> I guess It could be possible to write something like:
> {{PubsubIO.Write write =}}
>  {{    PubsubIO.writeMessages()}}
>  {{            .to("projects/project/topics/topic")}}
>  {{            *.withFactory(PubsubGrpcClient.FACTORY)*}}
>  {{            .withTimestampAttribute("timestamp")}}{{;}}
> or 
> {{PubsubIO.Read read = }}
>  {{    PubsubIO.readMessages()}}
>  {{            .fromSubscription("projects/project/subscriptions/name")}}
>  {{            *.withFactory(PubsubGrpcClient.FACTORY)*}}
>  {{            .withTimestampAttribute("timestamp");}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-5694) OOMs on Pub/Sub to BigQuery via FILE_LOADS

2019-01-10 Thread Chamikara Jayalath (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739588#comment-16739588
 ] 

Chamikara Jayalath edited comment on BEAM-5694 at 1/10/19 4:31 PM:
---

CC: [~reuvenlax]


was (Author: chamikara):
R: [~reuvenlax]

> OOMs on Pub/Sub to BigQuery via FILE_LOADS
> --
>
> Key: BEAM-5694
> URL: https://issues.apache.org/jira/browse/BEAM-5694
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Affects Versions: 2.7.0
> Environment: Google Dataflow
>Reporter: Coda Hale
>Assignee: Chamikara Jayalath
>Priority: Major
>
> I've got a streaming Dataflow job which streams data from a Pub/Sub 
> subscription to a single BigQuery table that I'm experimenting with moving to 
> batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the 
> job to successfully run is by increasing worker memory from 15GB to 52GB, 
> which seems like a lot.
> I haven't been able to get a heap dump, but observing the job I can see ~5GB 
> of records accumulate in GroupByDestination before the trigger duration 
> elapses and WriteGroupedRecords processes those, at which point I see OOM 
> errors in WriteGroupedRecords:
> {{Caused by: org.apache.beam.sdk.util.UserCodeException: 
> java.lang.OutOfMemoryError: Java heap space}}{{        
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)}}{{
>         
> org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown
>  Source)}}
> Like I said, I can resolve this by running the job with n1-highmem-8 
> machines, but this seems odd. The job is explicitly sharding data to keep 
> per-worker requirements low but there's still a per-worker bottleneck about 
> the size of the entire dataset. Increasing numFileShards doesn't seem to 
> affect this, either — increasing from 100 to 1,000 to 10,000 changed the 
> number of files but not the OOMs.
> The pipeline is fairly standard, but here's the code edited for 
> confidentiality:
> {code:java}
> pipeline
> .apply("Read", PubsubIO.readMessages().fromSubscription(subscription))
> .apply("Transform", ParDo.of(new MtoNFunction()))
> .apply(
> "Write",
> BigQueryIO.write()
> .withFormatFunction(a -> a)
> .to(tableRef)
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> .withTriggeringFrequency(Duration.standardMinutes(5))
> .withNumFileShards(100_000)
> 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
> 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> .withJsonSchema("redacted")
> 
> .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation(;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-5694) OOMs on Pub/Sub to BigQuery via FILE_LOADS

2019-01-10 Thread Chamikara Jayalath (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-5694:


Assignee: (was: Chamikara Jayalath)

> OOMs on Pub/Sub to BigQuery via FILE_LOADS
> --
>
> Key: BEAM-5694
> URL: https://issues.apache.org/jira/browse/BEAM-5694
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Affects Versions: 2.7.0
> Environment: Google Dataflow
>Reporter: Coda Hale
>Priority: Major
>
> I've got a streaming Dataflow job which streams data from a Pub/Sub 
> subscription to a single BigQuery table that I'm experimenting with moving to 
> batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the 
> job to successfully run is by increasing worker memory from 15GB to 52GB, 
> which seems like a lot.
> I haven't been able to get a heap dump, but observing the job I can see ~5GB 
> of records accumulate in GroupByDestination before the trigger duration 
> elapses and WriteGroupedRecords processes those, at which point I see OOM 
> errors in WriteGroupedRecords:
> {{Caused by: org.apache.beam.sdk.util.UserCodeException: 
> java.lang.OutOfMemoryError: Java heap space}}{{        
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)}}{{
>         
> org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown
>  Source)}}
> Like I said, I can resolve this by running the job with n1-highmem-8 
> machines, but this seems odd. The job is explicitly sharding data to keep 
> per-worker requirements low but there's still a per-worker bottleneck about 
> the size of the entire dataset. Increasing numFileShards doesn't seem to 
> affect this, either — increasing from 100 to 1,000 to 10,000 changed the 
> number of files but not the OOMs.
> The pipeline is fairly standard, but here's the code edited for 
> confidentiality:
> {code:java}
> pipeline
> .apply("Read", PubsubIO.readMessages().fromSubscription(subscription))
> .apply("Transform", ParDo.of(new MtoNFunction()))
> .apply(
> "Write",
> BigQueryIO.write()
> .withFormatFunction(a -> a)
> .to(tableRef)
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> .withTriggeringFrequency(Duration.standardMinutes(5))
> .withNumFileShards(100_000)
> 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
> 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> .withJsonSchema("redacted")
> 
> .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation(;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5694) OOMs on Pub/Sub to BigQuery via FILE_LOADS

2019-01-10 Thread Chamikara Jayalath (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739588#comment-16739588
 ] 

Chamikara Jayalath commented on BEAM-5694:
--

R: [~reuvenlax]

> OOMs on Pub/Sub to BigQuery via FILE_LOADS
> --
>
> Key: BEAM-5694
> URL: https://issues.apache.org/jira/browse/BEAM-5694
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Affects Versions: 2.7.0
> Environment: Google Dataflow
>Reporter: Coda Hale
>Assignee: Chamikara Jayalath
>Priority: Major
>
> I've got a streaming Dataflow job which streams data from a Pub/Sub 
> subscription to a single BigQuery table that I'm experimenting with moving to 
> batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the 
> job to successfully run is by increasing worker memory from 15GB to 52GB, 
> which seems like a lot.
> I haven't been able to get a heap dump, but observing the job I can see ~5GB 
> of records accumulate in GroupByDestination before the trigger duration 
> elapses and WriteGroupedRecords processes those, at which point I see OOM 
> errors in WriteGroupedRecords:
> {{Caused by: org.apache.beam.sdk.util.UserCodeException: 
> java.lang.OutOfMemoryError: Java heap space}}{{        
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)}}{{
>         
> org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown
>  Source)}}
> Like I said, I can resolve this by running the job with n1-highmem-8 
> machines, but this seems odd. The job is explicitly sharding data to keep 
> per-worker requirements low but there's still a per-worker bottleneck about 
> the size of the entire dataset. Increasing numFileShards doesn't seem to 
> affect this, either — increasing from 100 to 1,000 to 10,000 changed the 
> number of files but not the OOMs.
> The pipeline is fairly standard, but here's the code edited for 
> confidentiality:
> {code:java}
> pipeline
> .apply("Read", PubsubIO.readMessages().fromSubscription(subscription))
> .apply("Transform", ParDo.of(new MtoNFunction()))
> .apply(
> "Write",
> BigQueryIO.write()
> .withFormatFunction(a -> a)
> .to(tableRef)
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> .withTriggeringFrequency(Duration.standardMinutes(5))
> .withNumFileShards(100_000)
> 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
> 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> .withJsonSchema("redacted")
> 
> .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation(;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-5929) BigQueryIO DynamicDestination should support JSONSchema

2019-01-10 Thread Chamikara Jayalath (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-5929:


Assignee: (was: Chamikara Jayalath)

> BigQueryIO DynamicDestination should support JSONSchema
> ---
>
> Key: BEAM-5929
> URL: https://issues.apache.org/jira/browse/BEAM-5929
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Benson Tucker
>Priority: Minor
>
> JSON-formatted String schemas for BQ tables should be allowed in the 
> getSchema method of a DynamicDestination allowing dynamically-targeted writes 
> to take advantage of a data model like that expected by `withSchemaFromView`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-5635) FR: Enable Transactional writes with DatastoreIO

2019-01-10 Thread Chamikara Jayalath (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-5635:


Assignee: (was: Chamikara Jayalath)

> FR: Enable Transactional writes with DatastoreIO
> 
>
> Key: BEAM-5635
> URL: https://issues.apache.org/jira/browse/BEAM-5635
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Alex Amato
>Priority: Major
>
> I have seen a user who would like to use Datastore Transactions to rollback a 
> set of records if one of them fails to write. Let's consider this use case 
> for DatastoreIO



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5966) When working "LiteralGqlQuery", Error: Query cannot have any sort orders.

2019-01-10 Thread Chamikara Jayalath (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739585#comment-16739585
 ] 

Chamikara Jayalath commented on BEAM-5966:
--

Was your question resolved or is there something more to address here ?

> When working "LiteralGqlQuery", Error: Query cannot have any sort orders.
> -
>
> Key: BEAM-5966
> URL: https://issues.apache.org/jira/browse/BEAM-5966
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Affects Versions: 2.8.0
>Reporter: Murat ODUNC
>Assignee: Chamikara Jayalath
>Priority: Major
>
> Hello..
> I have used Apache Beam pipelines on my project. And I have a problem to read 
> data from GCP DataStore.
>  
> My First question is "How to create "Query" object" . I'am sharing a sample 
> of regarding JavaDoc.
> [https://beam.apache.org/releases/javadoc/2.8.0/]
> {code:java}
>  Query query = ...; // I dont know any idea how configure the object
>  String projectId = "...";
>  Pipeline p = Pipeline.create(options);
>  PCollection entities = p.apply(
>  DatastoreIO.v1().read()
>  .withProjectId(projectId)
>  .withQuery(query));
> {code}
> My second question is how to set "sort oder" of the query?
> I tried to 'LiteralGqlQuery' to read data but DataStore IO Reader  failed by 
> follow error trace..
> {noformat}
> java.lang.IllegalArgumentException: Query cannot have any sort orders. at 
> com.google.datastore.v1.client.QuerySplitterImpl.validateQuery(QuerySplitterImpl.java:128)
>  ~[datastore-v1-proto-client-1.6.0.jar:na] at 
> com.google.datastore.v1.client.QuerySplitterImpl.getSplits(QuerySplitterImpl.java:69)
>  ~[datastore-v1-proto-client-1.6.0.jar:na] at 
> org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read.splitQuery(DatastoreV1.java:454)
>  ~[beam-sdks-java-io-google-cloud-platform-2.8.0.jar:na] at 
> org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read.access$100(DatastoreV1.java:264)
>  ~[beam-sdks-java-io-google-cloud-platform-2.8.0.jar:na] at 
> org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read$SplitQueryFn.processElement(DatastoreV1.java:813)
>  ~[beam-sdks-java-io-google-cloud-platform-2.8.0.jar:na] at 
> org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read$SplitQueryFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source) [na:na] at 
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>  [beam-runners-direct-java-2.8.0.jar:na] at 
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
>  [beam-runners-direct-java-2.8.0.jar:na] at 
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
>  [beam-runners-direct-java-2.8.0.jar:na]
> at 
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:207)
>  [beam-runners-direct-java-2.8.0.jar:na] at 
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
>  [beam-runners-direct-java-2.8.0.jar:na] at 
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
>  [beam-runners-direct-java-2.8.0.jar:na] at 
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
>  [beam-runners-direct-java-2.8.0.jar:na] at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_162] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_162] at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [na:1.8.0_162] at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [na:1.8.0_162] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
> {noformat}
> My code is  here
>  
> {code:java}
> PipelineOptions options = PipelineOptionsFactory.create(); 
> options.setRunner(DirectRunner.class); String gqlQuery = 
> String.format("SELECT * FROM task " + "WHERE createdAt > DATETIME('%s') "+ 
> "AND createdAt < DATETIME('%s') ORDER BY createdAt ASC", 
> "2018-11-03T00:00:00Z", "2018-11-03T23:59:59Z" ); log.info("GQL: " + 
> gqlQuery); String projectId = "foo-bar"; String ns = 
> "prod-01ce205a-22ff-4bab-b133-926bdc54c8b3"; 
> Pipeline p = Pipeline.create(options); 
> PCollection entities = p.apply( DatastoreIO.v1().read()   
> .withProjectId(projectId).withLiteralGqlQuery(gqlQuery) .withQuery(query) 
> .withNamespace(ns)) .apply("Debug", ParDo.of(new DoFn(){ 
>   @ProcessElement 
>   public void processElement(ProcessContext c) {
> 

[jira] [Assigned] (BEAM-6061) When using unbounded PCollection from TextIO to BigQuery, data is stuck in Reshuffle/GroupByKey inside of BigQueryIO

2019-01-10 Thread Chamikara Jayalath (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-6061:


Assignee: (was: Chamikara Jayalath)

> When using unbounded PCollection from TextIO to BigQuery, data is stuck in 
> Reshuffle/GroupByKey inside of BigQueryIO
> 
>
> Key: BEAM-6061
> URL: https://issues.apache.org/jira/browse/BEAM-6061
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Affects Versions: 2.8.0
>Reporter: Florian Baumert
>Priority: Major
>
> As a short summary, when reading from TextIO with watchForNewFiles (unbounded 
> collection), BigQueryIO in stream mode, BigQueryIO does not write data. It is 
> "stuck" in a GroupByKey internal to Reshuffle.
>  
> The issue is detailed in stackoverflow with code and version information.
> [https://stackoverflow.com/questions/53266689/when-using-unbounded-pcollection-from-textio-to-bigquery-data-is-stuck-in-reshu]
>  
>  Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >