[jira] [Created] (BEAM-6052) elasticsearchIO checkForErrors method bug
Fred k created BEAM-6052: Summary: elasticsearchIO checkForErrors method bug Key: BEAM-6052 URL: https://issues.apache.org/jira/browse/BEAM-6052 Project: Beam Issue Type: Bug Components: io-java-elasticsearch Environment: beam-sdk-java-io-elasticsearch-2.8.0 Reporter: Fred k Assignee: Etienne Chauchot When i use Write to write update bulk request to elasticsearch, it appear the exception below: {code:java} Caused by: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted: at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215) at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235) at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199) {code} I check the method of checkForErrors, found out that can not parse the response include update contents. So i add the logic for parse update, i can see the output like below: {code:java} Caused by: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted: Document id 1465285334751e039cc4883a8a270191: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception) Document id e2722c653c65a4cb119e9b8dc44e37ad: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception) Document id b25472e3665695c49861f6eceee5531a: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception) Document id 022c1accdae82f6fe4108ba7989732fc: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception) at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215) at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235) at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199) {code} the reponse content is like below: { "took": 293, "errors": true, "items": [ { "update": { "_index": "test_kevin_2018-11", "_type": "kevin", "_id": "8d7664286c0887c637229166c7c613bc", "_version": 1, "result": "noop", "_shards": { "total": 1, "successful": 1, "failed": 0 }, "status": 200 } }, { "update": { "_index": "test_kevin_2018-11", "_type": "kevin", "_id": "49952be98f4fc160f56bcdb33b1dbf7e", "status": 429, "error": { "type": "es_rejected_execution_exception", "reason": "rejected execution of org.elasticsearch.transport.TransportService$7@3f70bbe7 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 200, completed tasks = 10034174]]" } } } } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-4650) Add retry policy to Python BQ streaming sink
[ https://issues.apache.org/jira/browse/BEAM-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684735#comment-16684735 ] Tarush Grover commented on BEAM-4650: - [~chamikara] I see the retry filter in retry.py file : https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/retry.py#L99. We want to give user an interface so that user can customise the retry policy for big query like based on what of errors should be consider for retry etc. Is my current understanding correct? > Add retry policy to Python BQ streaming sink > > > Key: BEAM-4650 > URL: https://issues.apache.org/jira/browse/BEAM-4650 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Jayalath >Assignee: Tarush Grover >Priority: Major > > Java supports specifying a retry policy when performing streaming writes to > BQ: > [https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java] > > We should update Python BQ streaming sink to support this as well. > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1430 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5221) Complile error: invalid LOC header (bad signature)
[ https://issues.apache.org/jira/browse/BEAM-5221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684728#comment-16684728 ] Shantanu Kirkire commented on BEAM-5221: Schlumberger-Private Tried that. Did not work. > Complile error: invalid LOC header (bad signature) > --- > > Key: BEAM-5221 > URL: https://issues.apache.org/jira/browse/BEAM-5221 > Project: Beam > Issue Type: Bug > Components: beam-model >Affects Versions: 2.6.0 > Environment: pring Tool Suite > Version: 3.9.2.RELEASE > Build Id: 201712210947 > Platform: Eclipse Oxygen.2 (4.7.2) > Maven 3.3.9 >Reporter: Shantanu Kirkire >Assignee: Luke Cwik >Priority: Major > Labels: build, compile-error, maven > > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) > on project linking: Compilation failure: Compilation failure: > [ERROR] error reading > C:\Users\SKirkire\.m2\repository\org\apache\beam\beam-model-pipeline\2.6.0\beam-model-pipeline-2.6.0.jar; > invalid LOC header (bad signature) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-4650) Add retry policy to Python BQ streaming sink
[ https://issues.apache.org/jira/browse/BEAM-4650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tarush Grover reassigned BEAM-4650: --- Assignee: Tarush Grover > Add retry policy to Python BQ streaming sink > > > Key: BEAM-4650 > URL: https://issues.apache.org/jira/browse/BEAM-4650 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Jayalath >Assignee: Tarush Grover >Priority: Major > > Java supports specifying a retry policy when performing streaming writes to > BQ: > [https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.java] > > We should update Python BQ streaming sink to support this as well. > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1430 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=165290&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165290 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 13/Nov/18 04:31 Start Date: 13/Nov/18 04:31 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r226883017 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; + +/** + * A holder object that lets you serialize and element with a Coder with minimal space wastage. Review comment: nit: ```suggestion * A holder object that lets you serialize an element with a Coder with minimal wasted space. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165290) Time Spent: 0.5h (was: 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Amit Sela >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-6051) Tensor flow Chicago taxi failing for 256MB data failing with disk exhausted
[ https://issues.apache.org/jira/browse/BEAM-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ankur Goenka updated BEAM-6051: --- Description: Chicago taxi example is failing when running on 1 node cluster with 256MB data. Disk usage: {code:java} goenka@goenka:/home/build$ ls -lha /tmp/flink-io-b9f13afc-0c5a-40ab-9a29-037d35068c1c/ total 8.5G drwxr-xr-x 2 goenka primarygroup 4.0K Nov 12 20:27 . drwxrwxrwt 37 root root 11M Nov 12 20:27 .. -rw-r--r-- 1 goenka primarygroup 2.2G Nov 12 20:27 550b3393ce2a4c35ba37135c20ebccb3.channel -rw-r--r-- 1 goenka primarygroup 2.2G Nov 12 20:27 573f0fcb90732308857025108ffc74f6.channel -rw-r--r-- 1 goenka primarygroup 2.2G Nov 12 20:27 fb08a9ea467a1045f6087088b395ea8d.channel -rw-r--r-- 1 goenka primarygroup 2.2G Nov 12 20:27 fcca6cbf619f712d852d2371d9cb7046.channel {code} was: Chicago taxi example is failing when running on 1 node cluster with 256MB data. Exception: {code:java} 018-11-07 15:20:10,736 INFO org.apache.flink.runtime.taskmanager.Task - GroupReduce (GroupReduce at Analyze/ComputeAnalyzerOutputs[0]/Analyze[scale_to_z_score_1/mean_and_var/mean/sum/]/CombineGlobally(_CombineFnWrapper)/CombinePerKey/GroupByKey) (1/1) (6be52aa98e1f7233172f58eb8695fb6d) switched from RUNNING to FAILED. java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at Analyze/ComputeAnalyzerOutputs[0]/Analyze[scale_to_z_score_1/mean_and_var/mean/sum/]/CombineGlobally(_CombineFnWrapper)/CombinePerKey/GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' ter minated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831) Caused by: java.io.IOException: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:986) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827) 2018-11-07 15:20:10,741 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: GroupReduce (GroupReduce at Analyze/ComputeAnalyzerOutputs[0]/Analyze[scale_to_z_score/mean_and_var/mean/sum/]/CombineGlobally(_CombineFnWrapper)/CombinePerKey/GroupByKey) (1/1) java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at Analyze/ComputeAnalyzerOutputs[0]/Analyze[scale_to_z_score/mean_and_var/mean/sum/]/CombineGlobally(_CombineFnWrapper)/CombinePerKey/GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' termi nated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum
[jira] [Created] (BEAM-6051) Tensor flow Chicago taxi failing for 256MB data failing with disk exhausted
Ankur Goenka created BEAM-6051: -- Summary: Tensor flow Chicago taxi failing for 256MB data failing with disk exhausted Key: BEAM-6051 URL: https://issues.apache.org/jira/browse/BEAM-6051 Project: Beam Issue Type: Bug Components: java-fn-execution, runner-flink Reporter: Ankur Goenka Assignee: Ankur Goenka Fix For: Not applicable Chicago taxi example is failing when running on 1 node cluster with 256MB data. Exception: {code:java} 018-11-07 15:20:10,736 INFO org.apache.flink.runtime.taskmanager.Task - GroupReduce (GroupReduce at Analyze/ComputeAnalyzerOutputs[0]/Analyze[scale_to_z_score_1/mean_and_var/mean/sum/]/CombineGlobally(_CombineFnWrapper)/CombinePerKey/GroupByKey) (1/1) (6be52aa98e1f7233172f58eb8695fb6d) switched from RUNNING to FAILED. java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at Analyze/ComputeAnalyzerOutputs[0]/Analyze[scale_to_z_score_1/mean_and_var/mean/sum/]/CombineGlobally(_CombineFnWrapper)/CombinePerKey/GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' ter minated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831) Caused by: java.io.IOException: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:986) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827) 2018-11-07 15:20:10,741 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: GroupReduce (GroupReduce at Analyze/ComputeAnalyzerOutputs[0]/Analyze[scale_to_z_score/mean_and_var/mean/sum/]/CombineGlobally(_CombineFnWrapper)/CombinePerKey/GroupByKey) (1/1) java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at Analyze/ComputeAnalyzerOutputs[0]/Analyze[scale_to_z_score/mean_and_var/mean/sum/]/CombineGlobally(_CombineFnWrapper)/CombinePerKey/GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' termi nated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831) Caused by: java.io.IOException: The record exceeds the maximum size of a sort buffer (current maximum: 4456448 bytes). at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:986) at org.apache.flink.runtime.operators.sort.Unilat
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165287&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165287 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 03:56 Start Date: 13/Nov/18 03:56 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232887391 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java ## @@ -230,23 +329,103 @@ public void close() throws Exception { private final Collector collector; private final Map outputMap; +@Nullable private final TimerReceiverFactory timerReceiverFactory; ReceiverFactory(Collector collector, Map outputMap) { + this(collector, outputMap, null); +} + +ReceiverFactory( +Collector collector, +Map outputMap, +@Nullable TimerReceiverFactory timerReceiverFactory) { this.collector = collector; this.outputMap = outputMap; + this.timerReceiverFactory = timerReceiverFactory; } @Override public FnDataReceiver create(String collectionId) { Integer unionTag = outputMap.get(collectionId); - checkArgument(unionTag != null, "Unknown PCollection id: %s", collectionId); - int tagInt = unionTag; + if (unionTag != null) { +int tagInt = unionTag; +return receivedElement -> { + synchronized (collectorLock) { +collector.collect(new RawUnionValue(tagInt, receivedElement)); + } +}; + } else if (timerReceiverFactory != null) { +// Delegate to TimerReceiverFactory +return timerReceiverFactory.create(collectionId); + } else { +throw new IllegalStateException( +String.format(Locale.ENGLISH, "Unknown PCollectionId %s", collectionId)); + } +} + } + + private static class TimerReceiverFactory implements OutputReceiverFactory { + +/** Timer PCollection id => TimerReference. */ +private final HashMap timerReferenceMap; +/** Timer PCollection id => timer name => TimerSpec. */ +private final Map> timerSpecMap; + +private final BiConsumer timerDataConsumer; +private final Coder windowCoder; + +TimerReceiverFactory( +Collection timerReferenceCollection, +Map> timerSpecMap, +BiConsumer timerDataConsumer, +Coder windowCoder) { + this.timerReferenceMap = new HashMap<>(); + for (TimerReference timerReference : timerReferenceCollection) { +timerReferenceMap.put(timerReference.collection().getId(), timerReference); + } + this.timerSpecMap = timerSpecMap; + this.timerDataConsumer = timerDataConsumer; + this.windowCoder = windowCoder; +} + +@Override +public FnDataReceiver create(String pCollectionId) { + // TODO This is ugly. There should be an easier way to retrieve the timer collectionid + String timerPCollectionId = + pCollectionId.substring(0, pCollectionId.length() - ".out:0".length()); Review comment: is it worth asserting `pCollection.endsWith(".out:0")` first, as a sanity check? I'm guessing the `checkNotNull` just below will fail in such a case anyway, but the error might be clearer by catching it early. or perhaps it just can never happen so it's not worth it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165287) Time Spent: 8h 10m (was: 8h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 8h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165288&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165288 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 03:56 Start Date: 13/Nov/18 03:56 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232889550 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -359,16 +534,86 @@ public void finishBundle() { emitResults(); } catch (Exception e) { throw new RuntimeException("Failed to finish remote bundle", e); + } finally { +remoteBundle = null; + } + if (bundleFinishedCallback != null) { +bundleFinishedCallback.run(); +bundleFinishedCallback = null; } } +void setTimerKey(Object key) { + this.beforeFireTimerKey = key; +} + +boolean isBundleInProgress() { + return remoteBundle != null; +} + +void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; +} + private void emitResults() { KV result; while ((result = outputQueue.poll()) != null) { -outputManager.output(outputMap.get(result.getKey()), (WindowedValue) result.getValue()); +final String inputCollectionId = result.getKey(); +TupleTag tag = outputMap.get(inputCollectionId); +WindowedValue windowedValue = +Preconditions.checkNotNull( +(WindowedValue) result.getValue(), +"Received a null value from the SDK harness for %s", +inputCollectionId); +if (tag != null) { + // process regular elements + outputManager.output(tag, windowedValue); +} else { + // process timer elements + // TODO This is ugly. There should be an easier way to retrieve the Review comment: not sure if this is right but: ```suggestion // TODO This is ugly. There should be an easier way to retrieve the timer ID ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165288) Time Spent: 8h 10m (was: 8h) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 8h 10m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165289&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165289 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 03:56 Start Date: 13/Nov/18 03:56 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232888779 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -304,16 +344,126 @@ protected void addSideInputValue(StreamRecord streamRecord) { @Override protected DoFnRunner createWrappingDoFnRunner( DoFnRunner wrappedRunner) { -return new SdkHarnessDoFnRunner(); +sdkHarnessRunner = +new SdkHarnessDoFnRunner<>( +executableStage.getInputPCollection().getId(), +stageBundleFactory, +stateRequestHandler, +progressHandler, +outputManager, +outputMap, +executableStage.getTimers(), +(Coder) windowingStrategy.getWindowFn().windowCoder(), +(WindowedValue key, TimerInternals.TimerData timerData) -> { + try { +keyForTimerToBeSet = keySelector.getKey(key); +timerInternals.setTimer(timerData); + } catch (Exception e) { +throw new RuntimeException("Couldn't set timer", e); + } finally { +keyForTimerToBeSet = null; + } +}); +return sdkHarnessRunner; + } + + @Override + public void processWatermark(Watermark mark) throws Exception { +// Due to the asynchronous communication with the SDK harness, +// a bundle might still be in progress and not all items have +// yet been received from the SDk harness. If we just set this Review comment: ```suggestion // yet been received from the SDK harness. If we just set this ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165289) Time Spent: 8h 20m (was: 8h 10m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 8h 20m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165285&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165285 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 03:56 Start Date: 13/Nov/18 03:56 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232887252 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -270,8 +282,36 @@ public void setKeyContextElement1(StreamRecord record) throws Exception { @Override public void setCurrentKey(Object key) { -throw new UnsupportedOperationException( -"Current key for state backend can only be set by state requests from SDK workers."); +// We don't need to set anything, the key is set manually on the state backend +// This will be called by HeapInternalTimerService before a timer is fired +if (!usesTimers) { + throw new UnsupportedOperationException( + "Current key for state backend can only be set by state requests from SDK workers or when processing timers."); +} + } + + @Override + public Object getCurrentKey() { +// This is the key retrieved by HeapInternalTimerService when setting a Flink timer +return keyForTimerToBeSet; + } + + @Override + public void fireTimer(InternalTimer timer) { +// We need to decode the key +final ByteBuffer encodedKey = (ByteBuffer) timer.getKey(); +@SuppressWarnings("ByteBufferBackingArray") +ByteArrayInputStream byteStream = new ByteArrayInputStream(encodedKey.array()); +final Object decodedKey; +try { + decodedKey = keyCoder.decode(byteStream); +} catch (IOException e) { + throw new RuntimeException( + String.format(Locale.ENGLISH, "Failed to decode encoded key: %s", encodedKey)); Review comment: is there something friendlier than `ByteBuffer.toString` that's worth logging here, from `encodedKey`? testing locally: ```scala ByteBuffer.wrap("asdf".getBytes) // java.nio.HeapByteBuffer[pos=0 lim=4 cap=4] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165285) Time Spent: 7h 50m (was: 7h 40m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 7h 50m > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4681) Integrate support for timers using the portability APIs into Flink
[ https://issues.apache.org/jira/browse/BEAM-4681?focusedWorklogId=165286&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165286 ] ASF GitHub Bot logged work on BEAM-4681: Author: ASF GitHub Bot Created on: 13/Nov/18 03:56 Start Date: 13/Nov/18 03:56 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7008: [BEAM-4681] Add support for portable timers in Flink batch mode URL: https://github.com/apache/beam/pull/7008#discussion_r232890143 ## File path: runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java ## @@ -85,17 +86,38 @@ .setInput("input") .setComponents( Components.newBuilder() + .putTransforms( + "transform", + RunnerApi.PTransform.newBuilder() + .putInputs("bla", "input") + .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PAR_DO_TRANSFORM_URN)) + .build()) .putPcollections("input", PCollection.getDefaultInstance()) .build()) + .addUserStates( + ExecutableStagePayload.UserStateId.newBuilder().setTransformId("transform").build()) .build(); private final JobInfo jobInfo = JobInfo.create("job-id", "job-name", "retrieval-token", Struct.getDefaultInstance()); @Before - public void setUpMocks() { + public void setUpMocks() throws Exception { MockitoAnnotations.initMocks(this); when(runtimeContext.getDistributedCache()).thenReturn(distributedCache); when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory); +RemoteBundle remoteBundle = Mockito.mock(RemoteBundle.class); +when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(remoteBundle); +//ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor = Review comment: intentionally leaving these here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165286) Time Spent: 8h (was: 7h 50m) > Integrate support for timers using the portability APIs into Flink > -- > > Key: BEAM-4681 > URL: https://issues.apache.org/jira/browse/BEAM-4681 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Time Spent: 8h > Remaining Estimate: 0h > > Consider using the code produced in BEAM-4658 to support timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6047) hdfsIntegrationTest is failing due to DisallowedDatanodeException
[ https://issues.apache.org/jira/browse/BEAM-6047?focusedWorklogId=165279&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165279 ] ASF GitHub Bot logged work on BEAM-6047: Author: ASF GitHub Bot Created on: 13/Nov/18 01:52 Start Date: 13/Nov/18 01:52 Worklog Time Spent: 10m Work Description: udim commented on issue #7013: [BEAM-6047] Clean up docker-compose artifacts URL: https://github.com/apache/beam/pull/7013#issuecomment-438099614 R: @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165279) Time Spent: 1h (was: 50m) > hdfsIntegrationTest is failing due to DisallowedDatanodeException > - > > Key: BEAM-6047 > URL: https://issues.apache.org/jira/browse/BEAM-6047 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Chamikara Jayalath >Assignee: Udi Meiri >Priority: Critical > Time Spent: 1h > Remaining Estimate: 0h > > beam_PostCommit_Python_Verify is perma red due to this. > > [https://scans.gradle.com/s/pqg3ent77c2pu/console-log?task=:beam-sdks-python:hdfsIntegrationTest#L1653] > > [36mnamenode_1_d5b2d221b2e2 |[0m > org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException: Datanode > denied communication with > namenode because hostname cannot be resolved (ip=172.18.0.3, > hostname=172.18.0.3): DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=717d9e8f-ff6b-449c-bc94-6deb57455f93, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-ef5ccec9-8245-45cc-9cc1-b103684e8ce1;nsid=1522131886;c=1542034438259) > > > [33mdatanode_1_3ee1bc415f49 |[0m 18/11/10 18:03:06 ERROR datanode.DataNode: > Initialization failed for Block pool BP-15607 > 62326-172.18.0.2-1541872973618 (Datanode Uuid > 367ac165-40b3-437f-97dc-567cb8d15da5) service to namenode/172.18.0.2:8020 > Datanode denied communication with namenode because hostname cannot be > resolved (ip=172.18.0.3, hostname=172.18.0.3): > DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=367ac165-40b3-437f-97dc-567cb8d15da5, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-d00d4259-9d80-4952-85d1-a0012d91842d;nsid=1271723208;c=1541872973618) > > Udi, any idea ? > > cc: [~swegner] [~altay] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-6047) hdfsIntegrationTest is failing due to DisallowedDatanodeException
[ https://issues.apache.org/jira/browse/BEAM-6047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684608#comment-16684608 ] Udi Meiri edited comment on BEAM-6047 at 11/13/18 1:52 AM: --- The script was pruning old networks (BEAM-4051) but leftover containers still referenced them. This is a problem if you run `docker-compose up` twice in a row and the leftover container is reused, but it's a rare occurrence since containers get rebuilt whenever a change is made. was (Author: udim): The script pruned old networks but leftover containers still referenced them. This is a problem if you run `docker-compose up` twice in a row and the leftover container is reused, but it's a rare occurrence since containers get rebuilt whenever a change is made. > hdfsIntegrationTest is failing due to DisallowedDatanodeException > - > > Key: BEAM-6047 > URL: https://issues.apache.org/jira/browse/BEAM-6047 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Chamikara Jayalath >Assignee: Udi Meiri >Priority: Critical > Time Spent: 50m > Remaining Estimate: 0h > > beam_PostCommit_Python_Verify is perma red due to this. > > [https://scans.gradle.com/s/pqg3ent77c2pu/console-log?task=:beam-sdks-python:hdfsIntegrationTest#L1653] > > [36mnamenode_1_d5b2d221b2e2 |[0m > org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException: Datanode > denied communication with > namenode because hostname cannot be resolved (ip=172.18.0.3, > hostname=172.18.0.3): DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=717d9e8f-ff6b-449c-bc94-6deb57455f93, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-ef5ccec9-8245-45cc-9cc1-b103684e8ce1;nsid=1522131886;c=1542034438259) > > > [33mdatanode_1_3ee1bc415f49 |[0m 18/11/10 18:03:06 ERROR datanode.DataNode: > Initialization failed for Block pool BP-15607 > 62326-172.18.0.2-1541872973618 (Datanode Uuid > 367ac165-40b3-437f-97dc-567cb8d15da5) service to namenode/172.18.0.2:8020 > Datanode denied communication with namenode because hostname cannot be > resolved (ip=172.18.0.3, hostname=172.18.0.3): > DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=367ac165-40b3-437f-97dc-567cb8d15da5, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-d00d4259-9d80-4952-85d1-a0012d91842d;nsid=1271723208;c=1541872973618) > > Udi, any idea ? > > cc: [~swegner] [~altay] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-6047) hdfsIntegrationTest is failing due to DisallowedDatanodeException
[ https://issues.apache.org/jira/browse/BEAM-6047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684608#comment-16684608 ] Udi Meiri commented on BEAM-6047: - The script pruned old networks but leftover containers still referenced them. This is a problem if you run `docker-compose up` twice in a row and the leftover container is reused, but it's a rare occurrence since containers get rebuilt whenever a change is made. > hdfsIntegrationTest is failing due to DisallowedDatanodeException > - > > Key: BEAM-6047 > URL: https://issues.apache.org/jira/browse/BEAM-6047 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Chamikara Jayalath >Assignee: Udi Meiri >Priority: Critical > Time Spent: 50m > Remaining Estimate: 0h > > beam_PostCommit_Python_Verify is perma red due to this. > > [https://scans.gradle.com/s/pqg3ent77c2pu/console-log?task=:beam-sdks-python:hdfsIntegrationTest#L1653] > > [36mnamenode_1_d5b2d221b2e2 |[0m > org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException: Datanode > denied communication with > namenode because hostname cannot be resolved (ip=172.18.0.3, > hostname=172.18.0.3): DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=717d9e8f-ff6b-449c-bc94-6deb57455f93, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-ef5ccec9-8245-45cc-9cc1-b103684e8ce1;nsid=1522131886;c=1542034438259) > > > [33mdatanode_1_3ee1bc415f49 |[0m 18/11/10 18:03:06 ERROR datanode.DataNode: > Initialization failed for Block pool BP-15607 > 62326-172.18.0.2-1541872973618 (Datanode Uuid > 367ac165-40b3-437f-97dc-567cb8d15da5) service to namenode/172.18.0.2:8020 > Datanode denied communication with namenode because hostname cannot be > resolved (ip=172.18.0.3, hostname=172.18.0.3): > DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=367ac165-40b3-437f-97dc-567cb8d15da5, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-d00d4259-9d80-4952-85d1-a0012d91842d;nsid=1271723208;c=1541872973618) > > Udi, any idea ? > > cc: [~swegner] [~altay] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6047) hdfsIntegrationTest is failing due to DisallowedDatanodeException
[ https://issues.apache.org/jira/browse/BEAM-6047?focusedWorklogId=165278&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165278 ] ASF GitHub Bot logged work on BEAM-6047: Author: ASF GitHub Bot Created on: 13/Nov/18 01:41 Start Date: 13/Nov/18 01:41 Worklog Time Spent: 10m Work Description: udim commented on issue #7013: [BEAM-6047] Clean up docker-compose artifacts URL: https://github.com/apache/beam/pull/7013#issuecomment-438097430 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165278) Time Spent: 50m (was: 40m) > hdfsIntegrationTest is failing due to DisallowedDatanodeException > - > > Key: BEAM-6047 > URL: https://issues.apache.org/jira/browse/BEAM-6047 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Chamikara Jayalath >Assignee: Udi Meiri >Priority: Critical > Time Spent: 50m > Remaining Estimate: 0h > > beam_PostCommit_Python_Verify is perma red due to this. > > [https://scans.gradle.com/s/pqg3ent77c2pu/console-log?task=:beam-sdks-python:hdfsIntegrationTest#L1653] > > [36mnamenode_1_d5b2d221b2e2 |[0m > org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException: Datanode > denied communication with > namenode because hostname cannot be resolved (ip=172.18.0.3, > hostname=172.18.0.3): DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=717d9e8f-ff6b-449c-bc94-6deb57455f93, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-ef5ccec9-8245-45cc-9cc1-b103684e8ce1;nsid=1522131886;c=1542034438259) > > > [33mdatanode_1_3ee1bc415f49 |[0m 18/11/10 18:03:06 ERROR datanode.DataNode: > Initialization failed for Block pool BP-15607 > 62326-172.18.0.2-1541872973618 (Datanode Uuid > 367ac165-40b3-437f-97dc-567cb8d15da5) service to namenode/172.18.0.2:8020 > Datanode denied communication with namenode because hostname cannot be > resolved (ip=172.18.0.3, hostname=172.18.0.3): > DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=367ac165-40b3-437f-97dc-567cb8d15da5, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-d00d4259-9d80-4952-85d1-a0012d91842d;nsid=1271723208;c=1541872973618) > > Udi, any idea ? > > cc: [~swegner] [~altay] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5788) wordcount_fnapi_it failed on TestDataflowRunner because of JSON string decoding error
[ https://issues.apache.org/jira/browse/BEAM-5788?focusedWorklogId=165277&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165277 ] ASF GitHub Bot logged work on BEAM-5788: Author: ASF GitHub Bot Created on: 13/Nov/18 01:38 Start Date: 13/Nov/18 01:38 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #7017: [BEAM-5788] Update storage_v1 client and messages URL: https://github.com/apache/beam/pull/7017#issuecomment-438096992 Run Python Dataflow ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165277) Time Spent: 1h 10m (was: 1h) > wordcount_fnapi_it failed on TestDataflowRunner because of JSON string > decoding error > - > > Key: BEAM-5788 > URL: https://issues.apache.org/jira/browse/BEAM-5788 > Project: Beam > Issue Type: Sub-task > Components: test-failures >Reporter: Mark Liu >Assignee: Valentyn Tymofieiev >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Similar to BEAM-5785, wordcount_fnapi_it failed on Python 3 when running with > TestDataflowRunner. Got TypeError: the JSON object must be str, not 'bytes'. > This error cause infinite retry before job could submitted to service. > More details about my env and test: > Python version: 3.5.3 > Test: > apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_fnapi_it > Command: > {code} > python setup.py nosetests \ > --tests > apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_fnapi_it \ > --nocapture \ > --nologcapture \ > --test-pipeline-options=" \ > --runner=TestDataflowRunner \ > --project= \ > --staging_location= \ > --temp_location= \ > --output= \ > > > --sdk_location=.../beam/sdks/python/dist/apache-beam-2.9.0.dev0.tar.gz \ > --num_workers=1" > {code} > Stacktrace: > {code} > WARNING:root:Retry with exponential backoff: waiting for 7.661876827680761 > seconds before retrying exists because we caught exception: TypeError: the > JSON object must be str, not 'bytes' > Traceback for above exception (most recent call last): > File ".../beam/sdks/python/apache_beam/utils/retry.py", line 184, in wrapper > return fun(*args, **kwargs) > File ".../beam/sdks/python/apache_beam/io/gcp/gcsio.py", line 375, in exists > self.client.objects.Get(request) # metadata > File > ".../beam/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", > line 955, in Get > download=download) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 722, in _RunMethod > return self.ProcessHttpResponse(method_config, http_response, request) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 728, in ProcessHttpResponse > self.__ProcessHttpResponse(method_config, http_response, request)) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 611, in __ProcessHttpResponse > response_type, http_response.content) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 442, in DeserializeMessage > message = encoding.JsonToMessage(response_type, data) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/encoding.py", > line 104, in JsonToMessage > return _ProtoJsonApiTools.Get().decode_message(message_type, message) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/encoding.py", > line 290, in decode_message > message_type, result) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/protorpclite/protojson.py", > line 210, in decode_message > dictionary = json.loads(encoded_message) > File "/usr/lib/python3.5/json/__init__.py", line 312, in loads > s.__class__.__name__)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6050) SplittableDoFnTest is failing for many runners
[ https://issues.apache.org/jira/browse/BEAM-6050?focusedWorklogId=165276&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165276 ] ASF GitHub Bot logged work on BEAM-6050: Author: ASF GitHub Bot Created on: 13/Nov/18 01:35 Start Date: 13/Nov/18 01:35 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #7016: [BEAM-6050] Use correct type on @ProcessElement method for SplittableDoFns URL: https://github.com/apache/beam/pull/7016#issuecomment-438096512 Roll forward fix for ValidatesRunner tests to unbreak failing post commits. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165276) Time Spent: 1h 10m (was: 1h) > SplittableDoFnTest is failing for many runners > -- > > Key: BEAM-6050 > URL: https://issues.apache.org/jira/browse/BEAM-6050 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Chamikara Jayalath >Assignee: Luke Cwik >Priority: Critical > Time Spent: 1h 10m > Remaining Estimate: 0h > > For example, > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/2190/#showFailuresLink] > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/2180/#showFailuresLink] > > Seems to be due to following commit. > [https://github.com/apache/beam/commit/413d8524f58604a3062a8eea07c1dcd1301e2f83] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6050) SplittableDoFnTest is failing for many runners
[ https://issues.apache.org/jira/browse/BEAM-6050?focusedWorklogId=165275&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165275 ] ASF GitHub Bot logged work on BEAM-6050: Author: ASF GitHub Bot Created on: 13/Nov/18 01:35 Start Date: 13/Nov/18 01:35 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #7016: [BEAM-6050] Use correct type on @ProcessElement method for SplittableDoFns URL: https://github.com/apache/beam/pull/7016 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165275) Time Spent: 1h (was: 50m) > SplittableDoFnTest is failing for many runners > -- > > Key: BEAM-6050 > URL: https://issues.apache.org/jira/browse/BEAM-6050 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Chamikara Jayalath >Assignee: Luke Cwik >Priority: Critical > Time Spent: 1h > Remaining Estimate: 0h > > For example, > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/2190/#showFailuresLink] > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/2180/#showFailuresLink] > > Seems to be due to following commit. > [https://github.com/apache/beam/commit/413d8524f58604a3062a8eea07c1dcd1301e2f83] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6050) SplittableDoFnTest is failing for many runners
[ https://issues.apache.org/jira/browse/BEAM-6050?focusedWorklogId=165274&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165274 ] ASF GitHub Bot logged work on BEAM-6050: Author: ASF GitHub Bot Created on: 13/Nov/18 01:34 Start Date: 13/Nov/18 01:34 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #7016: [BEAM-6050] Use correct type on @ProcessElement method for SplittableDoFns URL: https://github.com/apache/beam/pull/7016#issuecomment-438096304 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165274) Time Spent: 50m (was: 40m) > SplittableDoFnTest is failing for many runners > -- > > Key: BEAM-6050 > URL: https://issues.apache.org/jira/browse/BEAM-6050 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Chamikara Jayalath >Assignee: Luke Cwik >Priority: Critical > Time Spent: 50m > Remaining Estimate: 0h > > For example, > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/2190/#showFailuresLink] > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/2180/#showFailuresLink] > > Seems to be due to following commit. > [https://github.com/apache/beam/commit/413d8524f58604a3062a8eea07c1dcd1301e2f83] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5788) wordcount_fnapi_it failed on TestDataflowRunner because of JSON string decoding error
[ https://issues.apache.org/jira/browse/BEAM-5788?focusedWorklogId=165273&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165273 ] ASF GitHub Bot logged work on BEAM-5788: Author: ASF GitHub Bot Created on: 13/Nov/18 01:31 Start Date: 13/Nov/18 01:31 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #7010: [BEAM-5788] Fix DataflowRunner in Python3 - response encoding URL: https://github.com/apache/beam/pull/7010#issuecomment-438095563 Update storage v1: https://github.com/apache/beam/pull/7017 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165273) Time Spent: 1h (was: 50m) > wordcount_fnapi_it failed on TestDataflowRunner because of JSON string > decoding error > - > > Key: BEAM-5788 > URL: https://issues.apache.org/jira/browse/BEAM-5788 > Project: Beam > Issue Type: Sub-task > Components: test-failures >Reporter: Mark Liu >Assignee: Valentyn Tymofieiev >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Similar to BEAM-5785, wordcount_fnapi_it failed on Python 3 when running with > TestDataflowRunner. Got TypeError: the JSON object must be str, not 'bytes'. > This error cause infinite retry before job could submitted to service. > More details about my env and test: > Python version: 3.5.3 > Test: > apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_fnapi_it > Command: > {code} > python setup.py nosetests \ > --tests > apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_fnapi_it \ > --nocapture \ > --nologcapture \ > --test-pipeline-options=" \ > --runner=TestDataflowRunner \ > --project= \ > --staging_location= \ > --temp_location= \ > --output= \ > > > --sdk_location=.../beam/sdks/python/dist/apache-beam-2.9.0.dev0.tar.gz \ > --num_workers=1" > {code} > Stacktrace: > {code} > WARNING:root:Retry with exponential backoff: waiting for 7.661876827680761 > seconds before retrying exists because we caught exception: TypeError: the > JSON object must be str, not 'bytes' > Traceback for above exception (most recent call last): > File ".../beam/sdks/python/apache_beam/utils/retry.py", line 184, in wrapper > return fun(*args, **kwargs) > File ".../beam/sdks/python/apache_beam/io/gcp/gcsio.py", line 375, in exists > self.client.objects.Get(request) # metadata > File > ".../beam/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", > line 955, in Get > download=download) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 722, in _RunMethod > return self.ProcessHttpResponse(method_config, http_response, request) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 728, in ProcessHttpResponse > self.__ProcessHttpResponse(method_config, http_response, request)) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 611, in __ProcessHttpResponse > response_type, http_response.content) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 442, in DeserializeMessage > message = encoding.JsonToMessage(response_type, data) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/encoding.py", > line 104, in JsonToMessage > return _ProtoJsonApiTools.Get().decode_message(message_type, message) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/encoding.py", > line 290, in decode_message > message_type, result) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/protorpclite/protojson.py", > line 210, in decode_message > dictionary = json.loads(encoded_message) > File "/usr/lib/python3.5/json/__init__.py", line 312, in loads > s.__class__.__name__)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6050) SplittableDoFnTest is failing for many runners
[ https://issues.apache.org/jira/browse/BEAM-6050?focusedWorklogId=165272&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165272 ] ASF GitHub Bot logged work on BEAM-6050: Author: ASF GitHub Bot Created on: 13/Nov/18 01:30 Start Date: 13/Nov/18 01:30 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #7016: [BEAM-6050] Use correct type on @ProcessElement method for SplittableDoFns URL: https://github.com/apache/beam/pull/7016#issuecomment-438095469 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165272) Time Spent: 40m (was: 0.5h) > SplittableDoFnTest is failing for many runners > -- > > Key: BEAM-6050 > URL: https://issues.apache.org/jira/browse/BEAM-6050 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Chamikara Jayalath >Assignee: Luke Cwik >Priority: Critical > Time Spent: 40m > Remaining Estimate: 0h > > For example, > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/2190/#showFailuresLink] > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/2180/#showFailuresLink] > > Seems to be due to following commit. > [https://github.com/apache/beam/commit/413d8524f58604a3062a8eea07c1dcd1301e2f83] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5788) wordcount_fnapi_it failed on TestDataflowRunner because of JSON string decoding error
[ https://issues.apache.org/jira/browse/BEAM-5788?focusedWorklogId=165271&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165271 ] ASF GitHub Bot logged work on BEAM-5788: Author: ASF GitHub Bot Created on: 13/Nov/18 01:29 Start Date: 13/Nov/18 01:29 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #7010: [BEAM-5788] Fix DataflowRunner in Python3 - response encoding URL: https://github.com/apache/beam/pull/7010#issuecomment-438095289 Regenerate those files should be the right way to do. We should update both storage_v1 and dataflow_v1b3 client and messages. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165271) Time Spent: 50m (was: 40m) > wordcount_fnapi_it failed on TestDataflowRunner because of JSON string > decoding error > - > > Key: BEAM-5788 > URL: https://issues.apache.org/jira/browse/BEAM-5788 > Project: Beam > Issue Type: Sub-task > Components: test-failures >Reporter: Mark Liu >Assignee: Valentyn Tymofieiev >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Similar to BEAM-5785, wordcount_fnapi_it failed on Python 3 when running with > TestDataflowRunner. Got TypeError: the JSON object must be str, not 'bytes'. > This error cause infinite retry before job could submitted to service. > More details about my env and test: > Python version: 3.5.3 > Test: > apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_fnapi_it > Command: > {code} > python setup.py nosetests \ > --tests > apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_fnapi_it \ > --nocapture \ > --nologcapture \ > --test-pipeline-options=" \ > --runner=TestDataflowRunner \ > --project= \ > --staging_location= \ > --temp_location= \ > --output= \ > > > --sdk_location=.../beam/sdks/python/dist/apache-beam-2.9.0.dev0.tar.gz \ > --num_workers=1" > {code} > Stacktrace: > {code} > WARNING:root:Retry with exponential backoff: waiting for 7.661876827680761 > seconds before retrying exists because we caught exception: TypeError: the > JSON object must be str, not 'bytes' > Traceback for above exception (most recent call last): > File ".../beam/sdks/python/apache_beam/utils/retry.py", line 184, in wrapper > return fun(*args, **kwargs) > File ".../beam/sdks/python/apache_beam/io/gcp/gcsio.py", line 375, in exists > self.client.objects.Get(request) # metadata > File > ".../beam/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", > line 955, in Get > download=download) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 722, in _RunMethod > return self.ProcessHttpResponse(method_config, http_response, request) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 728, in ProcessHttpResponse > self.__ProcessHttpResponse(method_config, http_response, request)) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 611, in __ProcessHttpResponse > response_type, http_response.content) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 442, in DeserializeMessage > message = encoding.JsonToMessage(response_type, data) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/encoding.py", > line 104, in JsonToMessage > return _ProtoJsonApiTools.Get().decode_message(message_type, message) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/encoding.py", > line 290, in decode_message > message_type, result) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/protorpclite/protojson.py", > line 210, in decode_message > dictionary = json.loads(encoded_message) > File "/usr/lib/python3.5/json/__init__.py", line 312, in loads > s.__class__.__name__)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5788) wordcount_fnapi_it failed on TestDataflowRunner because of JSON string decoding error
[ https://issues.apache.org/jira/browse/BEAM-5788?focusedWorklogId=165270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165270 ] ASF GitHub Bot logged work on BEAM-5788: Author: ASF GitHub Bot Created on: 13/Nov/18 01:28 Start Date: 13/Nov/18 01:28 Worklog Time Spent: 10m Work Description: markflyhigh commented on issue #7017: [BEAM-5788] Update storage_v1 client and messages URL: https://github.com/apache/beam/pull/7017#issuecomment-438095004 +R: @aaltay @tvalentyn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165270) Time Spent: 40m (was: 0.5h) > wordcount_fnapi_it failed on TestDataflowRunner because of JSON string > decoding error > - > > Key: BEAM-5788 > URL: https://issues.apache.org/jira/browse/BEAM-5788 > Project: Beam > Issue Type: Sub-task > Components: test-failures >Reporter: Mark Liu >Assignee: Valentyn Tymofieiev >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Similar to BEAM-5785, wordcount_fnapi_it failed on Python 3 when running with > TestDataflowRunner. Got TypeError: the JSON object must be str, not 'bytes'. > This error cause infinite retry before job could submitted to service. > More details about my env and test: > Python version: 3.5.3 > Test: > apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_fnapi_it > Command: > {code} > python setup.py nosetests \ > --tests > apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_fnapi_it \ > --nocapture \ > --nologcapture \ > --test-pipeline-options=" \ > --runner=TestDataflowRunner \ > --project= \ > --staging_location= \ > --temp_location= \ > --output= \ > > > --sdk_location=.../beam/sdks/python/dist/apache-beam-2.9.0.dev0.tar.gz \ > --num_workers=1" > {code} > Stacktrace: > {code} > WARNING:root:Retry with exponential backoff: waiting for 7.661876827680761 > seconds before retrying exists because we caught exception: TypeError: the > JSON object must be str, not 'bytes' > Traceback for above exception (most recent call last): > File ".../beam/sdks/python/apache_beam/utils/retry.py", line 184, in wrapper > return fun(*args, **kwargs) > File ".../beam/sdks/python/apache_beam/io/gcp/gcsio.py", line 375, in exists > self.client.objects.Get(request) # metadata > File > ".../beam/sdks/python/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", > line 955, in Get > download=download) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 722, in _RunMethod > return self.ProcessHttpResponse(method_config, http_response, request) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 728, in ProcessHttpResponse > self.__ProcessHttpResponse(method_config, http_response, request)) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 611, in __ProcessHttpResponse > response_type, http_response.content) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/base_api.py", > line 442, in DeserializeMessage > message = encoding.JsonToMessage(response_type, data) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/encoding.py", > line 104, in JsonToMessage > return _ProtoJsonApiTools.Get().decode_message(message_type, message) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/py/encoding.py", > line 290, in decode_message > message_type, result) > File > ".../tmp/virtualenvs/py3-env/lib/python3.5/site-packages/apitools/base/protorpclite/protojson.py", > line 210, in decode_message > dictionary = json.loads(encoded_message) > File "/usr/lib/python3.5/json/__init__.py", line 312, in loads > s.__class__.__name__)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5744) Investigate negative numbers represented as 'long' in Python SDK + Direct runner
[ https://issues.apache.org/jira/browse/BEAM-5744?focusedWorklogId=165261&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165261 ] ASF GitHub Bot logged work on BEAM-5744: Author: ASF GitHub Bot Created on: 13/Nov/18 01:20 Start Date: 13/Nov/18 01:20 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #6685: [BEAM-5744] URL: https://github.com/apache/beam/pull/6685#issuecomment-438093055 There was a PR recently that once merged, would have broken one of the test added here, and that PR was rolled back, so it wouldn't hurt to merge this. R: @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165261) Time Spent: 2h 10m (was: 2h) > Investigate negative numbers represented as 'long' in Python SDK + Direct > runner > > > Key: BEAM-5744 > URL: https://issues.apache.org/jira/browse/BEAM-5744 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5788) wordcount_fnapi_it failed on TestDataflowRunner because of JSON string decoding error
[ https://issues.apache.org/jira/browse/BEAM-5788?focusedWorklogId=165269&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165269 ] ASF GitHub Bot logged work on BEAM-5788: Author: ASF GitHub Bot Created on: 13/Nov/18 01:27 Start Date: 13/Nov/18 01:27 Worklog Time Spent: 10m Work Description: markflyhigh opened a new pull request #7017: [BEAM-5788] Update storage_v1 client and messages URL: https://github.com/apache/beam/pull/7017 Update storage_v1_client.py and storage_v1_messages.py. New files are auto-generated based on google-apitools 0.5.23 which is the minimum version Beam currently required. There are two places not updated: - Apache License - imports This update will make storage_v1 API supports Python 3. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracki
[jira] [Work logged] (BEAM-5959) Add Cloud KMS support to GCS copies
[ https://issues.apache.org/jira/browse/BEAM-5959?focusedWorklogId=165268&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165268 ] ASF GitHub Bot logged work on BEAM-5959: Author: ASF GitHub Bot Created on: 13/Nov/18 01:26 Start Date: 13/Nov/18 01:26 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6931: [BEAM-5959] Update GCS apitools client. URL: https://github.com/apache/beam/pull/6931#issuecomment-438094504 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165268) Time Spent: 50m (was: 40m) > Add Cloud KMS support to GCS copies > --- > > Key: BEAM-5959 > URL: https://issues.apache.org/jira/browse/BEAM-5959 > Project: Beam > Issue Type: Bug > Components: io-java-gcp, sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Beam SDK currently uses the CopyTo GCS API call, which doesn't support > copying objects that Customer Managed Encryption Keys (CMEK). > CMEKs are managed in Cloud KMS. > Items (for Java and Python SDKs): > - Update clients to versions that support KMS keys. > - Change copyTo API calls to use rewriteTo (Python - directly, Java - > possibly convert copyTo API call to use client library) > - Add unit tests. > - Add basic tests (DirectRunner and GCS buckets with CMEK). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5959) Add Cloud KMS support to GCS copies
[ https://issues.apache.org/jira/browse/BEAM-5959?focusedWorklogId=165266&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165266 ] ASF GitHub Bot logged work on BEAM-5959: Author: ASF GitHub Bot Created on: 13/Nov/18 01:25 Start Date: 13/Nov/18 01:25 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6931: [BEAM-5959] Update GCS apitools client. URL: https://github.com/apache/beam/pull/6931#issuecomment-438094372 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165266) Time Spent: 40m (was: 0.5h) > Add Cloud KMS support to GCS copies > --- > > Key: BEAM-5959 > URL: https://issues.apache.org/jira/browse/BEAM-5959 > Project: Beam > Issue Type: Bug > Components: io-java-gcp, sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Beam SDK currently uses the CopyTo GCS API call, which doesn't support > copying objects that Customer Managed Encryption Keys (CMEK). > CMEKs are managed in Cloud KMS. > Items (for Java and Python SDKs): > - Update clients to versions that support KMS keys. > - Change copyTo API calls to use rewriteTo (Python - directly, Java - > possibly convert copyTo API call to use client library) > - Add unit tests. > - Add basic tests (DirectRunner and GCS buckets with CMEK). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6050) SplittableDoFnTest is failing for many runners
[ https://issues.apache.org/jira/browse/BEAM-6050?focusedWorklogId=165265&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165265 ] ASF GitHub Bot logged work on BEAM-6050: Author: ASF GitHub Bot Created on: 13/Nov/18 01:25 Start Date: 13/Nov/18 01:25 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #7016: [BEAM-6050] Use correct type on @ProcessElement method for SplittableDoFns URL: https://github.com/apache/beam/pull/7016#issuecomment-438094360 R: @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165265) Time Spent: 0.5h (was: 20m) > SplittableDoFnTest is failing for many runners > -- > > Key: BEAM-6050 > URL: https://issues.apache.org/jira/browse/BEAM-6050 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Chamikara Jayalath >Assignee: Luke Cwik >Priority: Critical > Time Spent: 0.5h > Remaining Estimate: 0h > > For example, > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/2190/#showFailuresLink] > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/2180/#showFailuresLink] > > Seems to be due to following commit. > [https://github.com/apache/beam/commit/413d8524f58604a3062a8eea07c1dcd1301e2f83] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6050) SplittableDoFnTest is failing for many runners
[ https://issues.apache.org/jira/browse/BEAM-6050?focusedWorklogId=165264&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165264 ] ASF GitHub Bot logged work on BEAM-6050: Author: ASF GitHub Bot Created on: 13/Nov/18 01:25 Start Date: 13/Nov/18 01:25 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #7016: [BEAM-6050] Use correct type on @ProcessElement method for SplittableDoFns URL: https://github.com/apache/beam/pull/7016#issuecomment-438094285 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165264) Time Spent: 20m (was: 10m) > SplittableDoFnTest is failing for many runners > -- > > Key: BEAM-6050 > URL: https://issues.apache.org/jira/browse/BEAM-6050 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Chamikara Jayalath >Assignee: Luke Cwik >Priority: Critical > Time Spent: 20m > Remaining Estimate: 0h > > For example, > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/2190/#showFailuresLink] > [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/2180/#showFailuresLink] > > Seems to be due to following commit. > [https://github.com/apache/beam/commit/413d8524f58604a3062a8eea07c1dcd1301e2f83] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6050) SplittableDoFnTest is failing for many runners
[ https://issues.apache.org/jira/browse/BEAM-6050?focusedWorklogId=165263&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165263 ] ASF GitHub Bot logged work on BEAM-6050: Author: ASF GitHub Bot Created on: 13/Nov/18 01:24 Start Date: 13/Nov/18 01:24 Worklog Time Spent: 10m Work Description: lukecwik opened a new pull request #7016: [BEAM-6050] Use correct type on @ProcessElement method for SplittableDoFns URL: https://github.com/apache/beam/pull/7016 Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165263) Time Spent: 10m Remaining Estimate: 0h > SplittableDoFnTest is failing for many runners > -- > > Key: BEAM-6050 > URL: https://is
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=165257&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165257 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 13/Nov/18 01:14 Start Date: 13/Nov/18 01:14 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-438091517 R: @iemejia can you please review or forward to somehow who is familiar with this code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165257) Time Spent: 20m (was: 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Amit Sela >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5744) Investigate negative numbers represented as 'long' in Python SDK + Direct runner
[ https://issues.apache.org/jira/browse/BEAM-5744?focusedWorklogId=165258&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165258 ] ASF GitHub Bot logged work on BEAM-5744: Author: ASF GitHub Bot Created on: 13/Nov/18 01:19 Start Date: 13/Nov/18 01:19 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #6685: [BEAM-5744] URL: https://github.com/apache/beam/pull/6685#issuecomment-438092741 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165258) Time Spent: 2h (was: 1h 50m) > Investigate negative numbers represented as 'long' in Python SDK + Direct > runner > > > Key: BEAM-5744 > URL: https://issues.apache.org/jira/browse/BEAM-5744 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5462) get rid of .options deprecation warnings in tests
[ https://issues.apache.org/jira/browse/BEAM-5462?focusedWorklogId=165260&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165260 ] ASF GitHub Bot logged work on BEAM-5462: Author: ASF GitHub Bot Created on: 13/Nov/18 01:19 Start Date: 13/Nov/18 01:19 Worklog Time Spent: 10m Work Description: ihji commented on issue #6930: [BEAM-5462] get rid of .options deprecation warnings in tests URL: https://github.com/apache/beam/pull/6930#issuecomment-438092890 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165260) Time Spent: 2.5h (was: 2h 20m) > get rid of .options deprecation warnings in tests > --- > > Key: BEAM-5462 > URL: https://issues.apache.org/jira/browse/BEAM-5462 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Heejong Lee >Priority: Minor > Time Spent: 2.5h > Remaining Estimate: 0h > > Messages look like: > {{/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py:360: > DeprecationWarning: options is deprecated since First stable release. > References to .options will not be supported}} > {{pipeline.replace_all(_get_transform_overrides(pipeline.options))}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5462) get rid of .options deprecation warnings in tests
[ https://issues.apache.org/jira/browse/BEAM-5462?focusedWorklogId=165259&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165259 ] ASF GitHub Bot logged work on BEAM-5462: Author: ASF GitHub Bot Created on: 13/Nov/18 01:19 Start Date: 13/Nov/18 01:19 Worklog Time Spent: 10m Work Description: ihji removed a comment on issue #6930: [BEAM-5462] get rid of .options deprecation warnings in tests URL: https://github.com/apache/beam/pull/6930#issuecomment-435531821 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165259) Time Spent: 2h 20m (was: 2h 10m) > get rid of .options deprecation warnings in tests > --- > > Key: BEAM-5462 > URL: https://issues.apache.org/jira/browse/BEAM-5462 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Heejong Lee >Priority: Minor > Time Spent: 2h 20m > Remaining Estimate: 0h > > Messages look like: > {{/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py:360: > DeprecationWarning: options is deprecated since First stable release. > References to .options will not be supported}} > {{pipeline.replace_all(_get_transform_overrides(pipeline.options))}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5744) Investigate negative numbers represented as 'long' in Python SDK + Direct runner
[ https://issues.apache.org/jira/browse/BEAM-5744?focusedWorklogId=165255&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165255 ] ASF GitHub Bot logged work on BEAM-5744: Author: ASF GitHub Bot Created on: 13/Nov/18 01:12 Start Date: 13/Nov/18 01:12 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6685: [BEAM-5744] URL: https://github.com/apache/beam/pull/6685#issuecomment-438090899 Please consider closing if this is not needed anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165255) Time Spent: 1h 50m (was: 1h 40m) > Investigate negative numbers represented as 'long' in Python SDK + Direct > runner > > > Key: BEAM-5744 > URL: https://issues.apache.org/jira/browse/BEAM-5744 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5299) Define max global window as a shared value in protos like URN enums.
[ https://issues.apache.org/jira/browse/BEAM-5299?focusedWorklogId=165254&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165254 ] ASF GitHub Bot logged work on BEAM-5299: Author: ASF GitHub Bot Created on: 13/Nov/18 01:10 Start Date: 13/Nov/18 01:10 Worklog Time Spent: 10m Work Description: HuangLED edited a comment on issue #7012: Revert "Revert "Revert "[BEAM-5299] Define max timestamp for global w… URL: https://github.com/apache/beam/pull/7012#issuecomment-438089110 > @HuangLED I have a general question on why such "Revert" PRs surface out of nowhere? I don't seem to find any email on the dev@ list about this. Which "internal" tests are breaking? Why do we need to revert changes in Beam that have been discussed at length? @lukecwik @chamikaramj @mxm Sorry for the confusion. internal testing refers to GCP dataflow service internal testing. Initially thought a rollback was the best option to not block other people. While creating this PR though, I was also chatting with several folks regarding which way would be the best fix. We ended up doing an internal java container release, followed by https://github.com/apache/beam/pull/7014. Now the breaking test issue is gone. We thus then closed this PR, before putting more information into it. Please kindly let me know what else can done to clean up better after wards, in addition to closing PR, in such a case. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165254) Time Spent: 14h 20m (was: 14h 10m) > Define max global window as a shared value in protos like URN enums. > > > Key: BEAM-5299 > URL: https://issues.apache.org/jira/browse/BEAM-5299 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-go, sdk-java-core, sdk-py-core >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Minor > Labels: portability > Fix For: 2.9.0 > > Time Spent: 14h 20m > Remaining Estimate: 0h > > Instead of having each language define a max timestamp themselves, define the > max timestamps within proto to be shared across different languages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5299) Define max global window as a shared value in protos like URN enums.
[ https://issues.apache.org/jira/browse/BEAM-5299?focusedWorklogId=165251&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165251 ] ASF GitHub Bot logged work on BEAM-5299: Author: ASF GitHub Bot Created on: 13/Nov/18 01:05 Start Date: 13/Nov/18 01:05 Worklog Time Spent: 10m Work Description: HuangLED commented on issue #7012: Revert "Revert "Revert "[BEAM-5299] Define max timestamp for global w… URL: https://github.com/apache/beam/pull/7012#issuecomment-438089110 > @HuangLED I have a general question on why such "Revert" PRs surface out of nowhere? I don't seem to find any email on the dev@ list about this. Which "internal" tests are breaking? Why do we need to revert changes in Beam that have been discussed at length? @lukecwik @chamikaramj @mxm Sorry for the confusing. Initially thought a rollback was the best option. While creating this PR though, I was also chatting with several folks regarding which way would be the best fix. We ended up doing an internal java container release, followed by https://github.com/apache/beam/pull/7014. Now the breaking test issue is gone. We thus then closed this PR, before putting more information into it. Please kindly let me know what else can done to clean up better after wards in such a case. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165251) Time Spent: 14h 10m (was: 14h) > Define max global window as a shared value in protos like URN enums. > > > Key: BEAM-5299 > URL: https://issues.apache.org/jira/browse/BEAM-5299 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-go, sdk-java-core, sdk-py-core >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Minor > Labels: portability > Fix For: 2.9.0 > > Time Spent: 14h 10m > Remaining Estimate: 0h > > Instead of having each language define a max timestamp themselves, define the > max timestamps within proto to be shared across different languages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4663) Implement Cost calculations for Cost-Based Optimization (CBO)
[ https://issues.apache.org/jira/browse/BEAM-4663?focusedWorklogId=165249&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165249 ] ASF GitHub Bot logged work on BEAM-4663: Author: ASF GitHub Bot Created on: 13/Nov/18 01:00 Start Date: 13/Nov/18 01:00 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6656: [BEAM-4663] [SQL] CBO cost calculation URL: https://github.com/apache/beam/pull/6656#issuecomment-438087967 Kai, any updates ? Please consider closing if this is not needed anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165249) Time Spent: 2h (was: 1h 50m) > Implement Cost calculations for Cost-Based Optimization (CBO) > -- > > Key: BEAM-4663 > URL: https://issues.apache.org/jira/browse/BEAM-4663 > Project: Beam > Issue Type: Sub-task > Components: dsl-sql >Reporter: Kai Jiang >Assignee: Kai Jiang >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > To support CBO, we should implement methods in each Beam*Rel.java. > computeSelfCost(...) as our first step. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165250&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165250 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 13/Nov/18 01:01 Start Date: 13/Nov/18 01:01 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232863912 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { +boolean allDone = true; +for (InboundDataClient idc : idcs) { + allDone &= idc.isDone(); +} +return allDone; + } + + /** + * Drains the internal queue of this class, by waiting for all WindowValues to be passed to thier + * consumers. The thread which wishes to process() the elements should call this method, as this + * will cause the consumers to invoke element processing. All receive() and send() calls must be + * made prior to calling drainAndBlock, in order to properly terminate. + */ + public void drainAndBlock() throws Exception { +// Note: We just throw the exception here +// TODO review the error handling here +while (true) { + ConsumerAndData tuple = null; + tuple = queue.poll(50, TimeUnit.MILLISECONDS); + // TODO should we im
[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO
[ https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=165248&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165248 ] ASF GitHub Bot logged work on BEAM-3925: Author: ASF GitHub Bot Created on: 13/Nov/18 00:54 Start Date: 13/Nov/18 00:54 Worklog Time Spent: 10m Work Description: rangadi closed pull request #6636: [BEAM-3925] [DO NOT MERGE] KafkaIO : Value provider support for reader configuration. URL: https://github.com/apache/beam/pull/6636 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 31ba72c54ba..86c339666f4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; @@ -96,8 +97,9 @@ * {@code * pipeline * .apply(KafkaIO.read() - * .withBootstrapServers("broker_1:9092,broker_2:9092") - * .withTopic("my_topic") // use withTopics(List) to read from multiple topics. + * .withBootstrapServers(StaticValueProvider.of("broker_1:9092,broker_2:9092")) + * .withTopic(StaticValueProvider.of("my_topic")) + * .withNumSplits(10) // Sets source parallelism. Default is runner dependent. * .withKeyDeserializer(LongDeserializer.class) * .withValueDeserializer(StringDeserializer.class) * @@ -244,7 +246,7 @@ */ public static Read read() { return new AutoValue_KafkaIO_Read.Builder() -.setTopics(new ArrayList<>()) +.setNumSplits(0) .setTopicPartitions(new ArrayList<>()) .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN) .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES) @@ -279,7 +281,11 @@ extends PTransform>> { abstract Map getConsumerConfig(); -abstract List getTopics(); +@Nullable +abstract ValueProvider getBootstrapServers(); + +@Nullable +abstract ValueProvider> getTopics(); abstract List getTopicPartitions(); @@ -313,13 +319,17 @@ abstract TimestampPolicyFactory getTimestampPolicyFactory(); +abstract int getNumSplits(); + abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setConsumerConfig(Map config); - abstract Builder setTopics(List topics); + abstract Builder setBootstrapServers(ValueProvider boostrapServers); + + abstract Builder setTopics(ValueProvider> topics); abstract Builder setTopicPartitions(List topicPartitions); @@ -348,13 +358,25 @@ abstract Builder setTimestampPolicyFactory( TimestampPolicyFactory timestampPolicyFactory); + abstract Builder setNumSplits(int numSplits); + abstract Read build(); } /** Sets the bootstrap servers for the Kafka consumer. */ public Read withBootstrapServers(String bootstrapServers) { - return updateConsumerProperties( - ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); + return withBootstrapServers(StaticValueProvider.of(bootstrapServers)); +} + +/** Sets the bootstrap servers for the Kafka consumer. */ +public Read withBootstrapServers(ValueProvider bootstrapServers) { + return toBuilder().setBootstrapServers(bootstrapServers).build(); +} + +/** ValueProvider version of {@link #withTopic(String)}. */ +public Read withTopic(ValueProvider topic) { + return withTopics( + ValueProvider.NestedValueProvider.of(topic, new SingletonListTranslator<>())); } /** @@ -364,7 +386,7 @@ * partitions are distributed among the splits. */ public Read withTopic(String topic) { - return withTopics(ImmutableList.of(topic)); + return withTopic(StaticValueProvider.of(topic)); } /** @@ -374,9 +396,17 @@ * partitions are distributed among the splits. */ public Read withTopics(List topics) { + return withTopics(StaticValueProvider.of(ImmutableList.copyOf(topics))); +} + +/** + * This is a {@link ValueProvider} version of {@link #withTopics(List)}. When topic names ar
[jira] [Work logged] (BEAM-3925) Allow ValueProvider for KafkaIO
[ https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=165246&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165246 ] ASF GitHub Bot logged work on BEAM-3925: Author: ASF GitHub Bot Created on: 13/Nov/18 00:53 Start Date: 13/Nov/18 00:53 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6636: [BEAM-3925] [DO NOT MERGE] KafkaIO : Value provider support for reader configuration. URL: https://github.com/apache/beam/pull/6636#issuecomment-438086803 Please consider closing if this is not needed anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165246) Time Spent: 6h 50m (was: 6h 40m) > Allow ValueProvider for KafkaIO > --- > > Key: BEAM-3925 > URL: https://issues.apache.org/jira/browse/BEAM-3925 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Sameer Abhyankar >Assignee: Pramod Upamanyu >Priority: Major > Time Spent: 6h 50m > Remaining Estimate: 0h > > Add ValueProvider support for the various methods in KafkaIO. This would > allow us to use KafkaIO in reusable pipeline templates. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3912) Add batching support for HadoopOutputFormatIO
[ https://issues.apache.org/jira/browse/BEAM-3912?focusedWorklogId=165240&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165240 ] ASF GitHub Bot logged work on BEAM-3912: Author: ASF GitHub Bot Created on: 13/Nov/18 00:36 Start Date: 13/Nov/18 00:36 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6306: [BEAM-3912] Add HadoopOutputFormatIO support URL: https://github.com/apache/beam/pull/6306#issuecomment-438083655 any updates ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165240) Time Spent: 9h 50m (was: 9h 40m) > Add batching support for HadoopOutputFormatIO > - > > Key: BEAM-3912 > URL: https://issues.apache.org/jira/browse/BEAM-3912 > Project: Beam > Issue Type: Sub-task > Components: io-java-hadoop >Reporter: Alexey Romanenko >Assignee: Alexey Romanenko >Priority: Minor > Time Spent: 9h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5953) Support DataflowRunner on Python 3
[ https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684547#comment-16684547 ] Valentyn Tymofieiev commented on BEAM-5953: --- Sounds like we need to change stage_file implementation: [https://github.com/apache/beam/blob/41bf69391fe2db032302960d2188134563d6653d/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L464] Either fix, current usage of apitools client, or reimplement it using google-clooud-storage (which can be a self-contained step towards moving away from apitools, but will add a new dependency). This is very likely related: https://issues.apache.org/jira/browse/BEAM-5502. > Support DataflowRunner on Python 3 > -- > > Key: BEAM-5953 > URL: https://issues.apache.org/jira/browse/BEAM-5953 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-5502) Object stager tests are not hermetic
[ https://issues.apache.org/jira/browse/BEAM-5502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Valentyn Tymofieiev reassigned BEAM-5502: - Assignee: Mark Liu (was: Valentyn Tymofieiev) > Object stager tests are not hermetic > > > Key: BEAM-5502 > URL: https://issues.apache.org/jira/browse/BEAM-5502 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Assignee: Mark Liu >Priority: Major > > As per discussion in https://github.com/apache/beam/pull/6451, > test_with_setup_file fails on Python 3, however it does not fail on its own > or when only running the runners tests. It only fails when running the full > test suite, so it seems like there is a conflict with another test. > cc [~RobbeSneyders]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-5953) Support DataflowRunner on Python 3
[ https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684498#comment-16684498 ] Mark Liu edited comment on BEAM-5953 at 11/13/18 12:09 AM: --- In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='my-project') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. see [this job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe] It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) was (Author: markflyhigh): In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='my-project') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) > Support DataflowRunner on Python 3 > -- > > Key: BEAM-5953 > URL: https://issues.apache.org/jira/browse/BEAM-5953 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5299) Define max global window as a shared value in protos like URN enums.
[ https://issues.apache.org/jira/browse/BEAM-5299?focusedWorklogId=165230&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165230 ] ASF GitHub Bot logged work on BEAM-5299: Author: ASF GitHub Bot Created on: 13/Nov/18 00:16 Start Date: 13/Nov/18 00:16 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #7014: [BEAM-5299] Update worker container version to most recent release. URL: https://github.com/apache/beam/pull/7014#issuecomment-438079798 LGTM. Merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165230) Time Spent: 13h 50m (was: 13h 40m) > Define max global window as a shared value in protos like URN enums. > > > Key: BEAM-5299 > URL: https://issues.apache.org/jira/browse/BEAM-5299 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-go, sdk-java-core, sdk-py-core >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Minor > Labels: portability > Fix For: 2.9.0 > > Time Spent: 13h 50m > Remaining Estimate: 0h > > Instead of having each language define a max timestamp themselves, define the > max timestamps within proto to be shared across different languages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5299) Define max global window as a shared value in protos like URN enums.
[ https://issues.apache.org/jira/browse/BEAM-5299?focusedWorklogId=165231&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165231 ] ASF GitHub Bot logged work on BEAM-5299: Author: ASF GitHub Bot Created on: 13/Nov/18 00:16 Start Date: 13/Nov/18 00:16 Worklog Time Spent: 10m Work Description: chamikaramj closed pull request #7014: [BEAM-5299] Update worker container version to most recent release. URL: https://github.com/apache/beam/pull/7014 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index ab18b5381ee..d7b6081e8bf 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -39,7 +39,7 @@ processResources { filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [ 'dataflow.legacy_environment_major_version' : '7', 'dataflow.fnapi_environment_major_version' : '7', -'dataflow.container_version' : 'beam-master-20180730' +'dataflow.container_version' : 'beam-master-20181112' ] } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165231) Time Spent: 14h (was: 13h 50m) > Define max global window as a shared value in protos like URN enums. > > > Key: BEAM-5299 > URL: https://issues.apache.org/jira/browse/BEAM-5299 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-go, sdk-java-core, sdk-py-core >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Minor > Labels: portability > Fix For: 2.9.0 > > Time Spent: 14h > Remaining Estimate: 0h > > Instead of having each language define a max timestamp themselves, define the > max timestamps within proto to be shared across different languages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-5953) Support DataflowRunner on Python 3
[ https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684498#comment-16684498 ] Mark Liu edited comment on BEAM-5953 at 11/13/18 12:12 AM: --- In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='my-project') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. see [this job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe] (the job was hanging since sdk harness startup failed, not related to this error. [harness-startup log|https://pantheon.corp.google.com/logs/viewer?resource=dataflow_step%2Fjob_id%2F2018-11-12_15_25_55-9696772999324855617&logName=projects%2Fgoogle.com:clouddfe%2Flogs%2Fdataflow.googleapis.com%252Fharness-startup&interval=NO_LIMIT&project=google.com:clouddfe&minLogLevel=0&expandAll=false×tamp=2018-11-13T00:11:14.44900Z&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2018-11-12T23:26:40.731883000Z] shows the successful messages). It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) was (Author: markflyhigh): In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='my-project') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. see [this job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe]. It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) > Support DataflowRunner on Python 3 > -- > > Key: BEAM-5953 > URL: https://issues.apache.org/jira/browse/BEAM-5953 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-5953) Support DataflowRunner on Python 3
[ https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684498#comment-16684498 ] Mark Liu edited comment on BEAM-5953 at 11/13/18 12:09 AM: --- In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='my-project') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. see [this job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe]. It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) was (Author: markflyhigh): In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='my-project') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. see [this job|https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-11-12_15_25_55-9696772999324855617?project=google.com:clouddfe] It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) > Support DataflowRunner on Python 3 > -- > > Key: BEAM-5953 > URL: https://issues.apache.org/jira/browse/BEAM-5953 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=165218&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165218 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 12/Nov/18 23:52 Start Date: 12/Nov/18 23:52 Worklog Time Spent: 10m Work Description: ihji commented on issue #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#issuecomment-438074887 @chamikaramj, @udim: Addressed most of the issues raised in the initial code review. Waiting for additional feedback. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165218) Time Spent: 9h 20m (was: 9h 10m) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 9h 20m > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-5953) Support DataflowRunner on Python 3
[ https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684498#comment-16684498 ] Mark Liu edited comment on BEAM-5953 at 11/12/18 11:50 PM: --- In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='my-project') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) was (Author: markflyhigh): In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='google.com:clouddfe') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) > Support DataflowRunner on Python 3 > -- > > Key: BEAM-5953 > URL: https://issues.apache.org/jira/browse/BEAM-5953 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5953) Support DataflowRunner on Python 3
[ https://issues.apache.org/jira/browse/BEAM-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684498#comment-16684498 ] Mark Liu commented on BEAM-5953: In order to verify the problem comes from corrupted proto file, I did two things: 1) dump proto into local file before staging and read it same way as DataflowWorkerHarnessHelper.java does. The proto can be printed successfully. 2) replace existing upload approach ([this line|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L523]) with google-cloud-storage library: {code:python} # Upload pipline.pb via google-cloud-storage import tempfile temp_file = tempfile.NamedTemporaryFile() with open(temp_file.name, 'wb') as f: f.write(io.BytesIO(job.proto_pipeline.SerializeToString()).read()) gcs_location = FileSystems.join(job.google_cloud_options.staging_location, 'pipeline.pb') bucket_name, name = gcs_location[5:].split('/', 1) from google.cloud import storage client = storage.Client(project='google.com:clouddfe') bucket = client.get_bucket(bucket_name) blob = bucket.blob(name) blob.upload_from_filename(temp_file.name) logging.info('Starting GCS upload to %s...', gcs_location) {code} With the code change, runner harness started successfully. It could be another case to support moving off google-apitools from SDK (https://issues.apache.org/jira/browse/BEAM-4850) > Support DataflowRunner on Python 3 > -- > > Key: BEAM-5953 > URL: https://issues.apache.org/jira/browse/BEAM-5953 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=165217&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165217 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 12/Nov/18 23:47 Start Date: 12/Nov/18 23:47 Worklog Time Spent: 10m Work Description: ihji commented on issue #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#issuecomment-438073834 @udim: python3 test compatibility is blocked by unorderable types issue similar to BEAM-5621. `equal_to` and several other testing methods like `assert_reentrant_reads_succeed` use `sorted` before comparing input values. `parquetio_test` has multiple test cases that the element type is a dictionary which is unorderable in python3. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165217) Time Spent: 9h 10m (was: 9h) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 9h 10m > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6050) SplittableDoFnTest is failing for many runners
Chamikara Jayalath created BEAM-6050: Summary: SplittableDoFnTest is failing for many runners Key: BEAM-6050 URL: https://issues.apache.org/jira/browse/BEAM-6050 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Chamikara Jayalath Assignee: Luke Cwik For example, [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/2190/#showFailuresLink] [https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/2180/#showFailuresLink] Seems to be due to following commit. [https://github.com/apache/beam/commit/413d8524f58604a3062a8eea07c1dcd1301e2f83] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6047) hdfsIntegrationTest is failing due to DisallowedDatanodeException
[ https://issues.apache.org/jira/browse/BEAM-6047?focusedWorklogId=165214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165214 ] ASF GitHub Bot logged work on BEAM-6047: Author: ASF GitHub Bot Created on: 12/Nov/18 23:42 Start Date: 12/Nov/18 23:42 Worklog Time Spent: 10m Work Description: udim commented on issue #7013: [BEAM-6047] DO NOT MERGE: listing containers URL: https://github.com/apache/beam/pull/7013#issuecomment-438072162 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165214) Time Spent: 40m (was: 0.5h) > hdfsIntegrationTest is failing due to DisallowedDatanodeException > - > > Key: BEAM-6047 > URL: https://issues.apache.org/jira/browse/BEAM-6047 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Chamikara Jayalath >Assignee: Udi Meiri >Priority: Critical > Time Spent: 40m > Remaining Estimate: 0h > > beam_PostCommit_Python_Verify is perma red due to this. > > [https://scans.gradle.com/s/pqg3ent77c2pu/console-log?task=:beam-sdks-python:hdfsIntegrationTest#L1653] > > [36mnamenode_1_d5b2d221b2e2 |[0m > org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException: Datanode > denied communication with > namenode because hostname cannot be resolved (ip=172.18.0.3, > hostname=172.18.0.3): DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=717d9e8f-ff6b-449c-bc94-6deb57455f93, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-ef5ccec9-8245-45cc-9cc1-b103684e8ce1;nsid=1522131886;c=1542034438259) > > > [33mdatanode_1_3ee1bc415f49 |[0m 18/11/10 18:03:06 ERROR datanode.DataNode: > Initialization failed for Block pool BP-15607 > 62326-172.18.0.2-1541872973618 (Datanode Uuid > 367ac165-40b3-437f-97dc-567cb8d15da5) service to namenode/172.18.0.2:8020 > Datanode denied communication with namenode because hostname cannot be > resolved (ip=172.18.0.3, hostname=172.18.0.3): > DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=367ac165-40b3-437f-97dc-567cb8d15da5, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-d00d4259-9d80-4952-85d1-a0012d91842d;nsid=1271723208;c=1541872973618) > > Udi, any idea ? > > cc: [~swegner] [~altay] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=165208&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165208 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 12/Nov/18 23:27 Start Date: 12/Nov/18 23:27 Worklog Time Spent: 10m Work Description: ihji removed a comment on issue #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#issuecomment-438038046 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165208) Time Spent: 8h 50m (was: 8h 40m) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 8h 50m > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165213&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165213 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 23:31 Start Date: 12/Nov/18 23:31 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232848037 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { +boolean allDone = true; +for (InboundDataClient idc : idcs) { + allDone &= idc.isDone(); +} +return allDone; + } + + /** + * Drains the internal queue of this class, by waiting for all WindowValues to be passed to thier + * consumers. The thread which wishes to process() the elements should call this method, as this + * will cause the consumers to invoke element processing. All receive() and send() calls must be + * made prior to calling drainAndBlock, in order to properly terminate. + */ + public void drainAndBlock() throws Exception { +// Note: We just throw the exception here +// TODO review the error handling here +while (true) { + ConsumerAndData tuple = null; + tuple = queue.poll(50, TimeUnit.MILLISECONDS); + // TODO should we im
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165211&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165211 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 23:30 Start Date: 12/Nov/18 23:30 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232847785 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { +boolean allDone = true; +for (InboundDataClient idc : idcs) { + allDone &= idc.isDone(); +} +return allDone; + } + + /** + * Drains the internal queue of this class, by waiting for all WindowValues to be passed to thier + * consumers. The thread which wishes to process() the elements should call this method, as this + * will cause the consumers to invoke element processing. All receive() and send() calls must be + * made prior to calling drainAndBlock, in order to properly terminate. + */ + public void drainAndBlock() throws Exception { +// Note: We just throw the exception here +// TODO review the error handling here +while (true) { + ConsumerAndData tuple = null; + tuple = queue.poll(50, TimeUnit.MILLISECONDS); + // TODO should we im
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165212&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165212 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 23:30 Start Date: 12/Nov/18 23:30 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232847804 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { +boolean allDone = true; +for (InboundDataClient idc : idcs) { + allDone &= idc.isDone(); +} +return allDone; + } + + /** + * Drains the internal queue of this class, by waiting for all WindowValues to be passed to thier + * consumers. The thread which wishes to process() the elements should call this method, as this + * will cause the consumers to invoke element processing. All receive() and send() calls must be + * made prior to calling drainAndBlock, in order to properly terminate. + */ + public void drainAndBlock() throws Exception { +// Note: We just throw the exception here +// TODO review the error handling here +while (true) { + ConsumerAndData tuple = null; + tuple = queue.poll(50, TimeUnit.MILLISECONDS); + // TODO should we im
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=165209&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165209 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 12/Nov/18 23:27 Start Date: 12/Nov/18 23:27 Worklog Time Spent: 10m Work Description: ihji commented on issue #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#issuecomment-438068248 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165209) Time Spent: 9h (was: 8h 50m) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 9h > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165195&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165195 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 22:59 Start Date: 12/Nov/18 22:59 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232841497 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165195) Time Spent: 6h 40m (was: 6.5h) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-e
[jira] [Work logged] (BEAM-5299) Define max global window as a shared value in protos like URN enums.
[ https://issues.apache.org/jira/browse/BEAM-5299?focusedWorklogId=165197&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165197 ] ASF GitHub Bot logged work on BEAM-5299: Author: ASF GitHub Bot Created on: 12/Nov/18 23:00 Start Date: 12/Nov/18 23:00 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #7014: [BEAM-5299] Update worker container version to most recent release. URL: https://github.com/apache/beam/pull/7014#issuecomment-438059820 Run Dataflow ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165197) Time Spent: 13h 40m (was: 13.5h) > Define max global window as a shared value in protos like URN enums. > > > Key: BEAM-5299 > URL: https://issues.apache.org/jira/browse/BEAM-5299 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-go, sdk-java-core, sdk-py-core >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Minor > Labels: portability > Fix For: 2.9.0 > > Time Spent: 13h 40m > Remaining Estimate: 0h > > Instead of having each language define a max timestamp themselves, define the > max timestamps within proto to be shared across different languages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165196&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165196 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 22:59 Start Date: 12/Nov/18 22:59 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232841506 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165196) Time Spent: 6h 50m (was: 6h 40m) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 6h 50m > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165194&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165194 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 22:58 Start Date: 12/Nov/18 22:58 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232841329 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165194) Time Spent: 6.5h (was: 6h 20m) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165190&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165190 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 22:54 Start Date: 12/Nov/18 22:54 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232840319 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165190) Time Spent: 6h 20m (was: 6h 10m) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5221) Complile error: invalid LOC header (bad signature)
[ https://issues.apache.org/jira/browse/BEAM-5221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684453#comment-16684453 ] Luke Cwik commented on BEAM-5221: - Shantaru, have you tried deleting C:\Users\SKirkire\.m2\repository\org\apache\beam\? The M2 repository has been known to corrupt itself in the past. > Complile error: invalid LOC header (bad signature) > --- > > Key: BEAM-5221 > URL: https://issues.apache.org/jira/browse/BEAM-5221 > Project: Beam > Issue Type: Bug > Components: beam-model >Affects Versions: 2.6.0 > Environment: pring Tool Suite > Version: 3.9.2.RELEASE > Build Id: 201712210947 > Platform: Eclipse Oxygen.2 (4.7.2) > Maven 3.3.9 >Reporter: Shantanu Kirkire >Assignee: Luke Cwik >Priority: Major > Labels: build, compile-error, maven > > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) > on project linking: Compilation failure: Compilation failure: > [ERROR] error reading > C:\Users\SKirkire\.m2\repository\org\apache\beam\beam-model-pipeline\2.6.0\beam-model-pipeline-2.6.0.jar; > invalid LOC header (bad signature) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165189&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165189 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 22:49 Start Date: 12/Nov/18 22:49 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232839353 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ## @@ -227,6 +229,8 @@ private void createRunnerAndConsumersForPTransformRecursively( ProcessBundleResponse.Builder response = ProcessBundleResponse.newBuilder(); +boolean hasSinkPtransform = false; Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165189) Time Spent: 6h 10m (was: 6h) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5299) Define max global window as a shared value in protos like URN enums.
[ https://issues.apache.org/jira/browse/BEAM-5299?focusedWorklogId=165188&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165188 ] ASF GitHub Bot logged work on BEAM-5299: Author: ASF GitHub Bot Created on: 12/Nov/18 22:48 Start Date: 12/Nov/18 22:48 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #7014: [BEAM-5299] Update worker container version to most recent release. URL: https://github.com/apache/beam/pull/7014#issuecomment-438056184 R: @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165188) Time Spent: 13.5h (was: 13h 20m) > Define max global window as a shared value in protos like URN enums. > > > Key: BEAM-5299 > URL: https://issues.apache.org/jira/browse/BEAM-5299 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-go, sdk-java-core, sdk-py-core >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Minor > Labels: portability > Fix For: 2.9.0 > > Time Spent: 13.5h > Remaining Estimate: 0h > > Instead of having each language define a max timestamp themselves, define the > max timestamps within proto to be shared across different languages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5299) Define max global window as a shared value in protos like URN enums.
[ https://issues.apache.org/jira/browse/BEAM-5299?focusedWorklogId=165187&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165187 ] ASF GitHub Bot logged work on BEAM-5299: Author: ASF GitHub Bot Created on: 12/Nov/18 22:46 Start Date: 12/Nov/18 22:46 Worklog Time Spent: 10m Work Description: tweise commented on issue #7012: Revert "Revert "Revert "[BEAM-5299] Define max timestamp for global w… URL: https://github.com/apache/beam/pull/7012#issuecomment-438055514 @HuangLED I have a general question on why such "Revert" PRs surface out of nowhere? I don't seem to find any email on the dev@ list about this. Which "internal" tests are breaking? Why do we need to revert changes in Beam that have been discussed at length? @lukecwik @chamikaramj @mxm This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165187) Time Spent: 13h 20m (was: 13h 10m) > Define max global window as a shared value in protos like URN enums. > > > Key: BEAM-5299 > URL: https://issues.apache.org/jira/browse/BEAM-5299 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-go, sdk-java-core, sdk-py-core >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Minor > Labels: portability > Fix For: 2.9.0 > > Time Spent: 13h 20m > Remaining Estimate: 0h > > Instead of having each language define a max timestamp themselves, define the > max timestamps within proto to be shared across different languages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-5446) SplittableDoFn: Remove runner time execution information from public API surface
[ https://issues.apache.org/jira/browse/BEAM-5446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Cwik resolved BEAM-5446. - Resolution: Fixed Fix Version/s: 2.9.0 > SplittableDoFn: Remove runner time execution information from public API > surface > > > Key: BEAM-5446 > URL: https://issues.apache.org/jira/browse/BEAM-5446 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Luke Cwik >Assignee: Luke Cwik >Priority: Minor > Fix For: 2.9.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Move the setting of "claim observers" within RestrictionTracker to another > location to clean up the RestrictionTracker interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5299) Define max global window as a shared value in protos like URN enums.
[ https://issues.apache.org/jira/browse/BEAM-5299?focusedWorklogId=165186&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165186 ] ASF GitHub Bot logged work on BEAM-5299: Author: ASF GitHub Bot Created on: 12/Nov/18 22:43 Start Date: 12/Nov/18 22:43 Worklog Time Spent: 10m Work Description: HuangLED commented on issue #7014: [BEAM-5299] Update worker container version to most recent release. URL: https://github.com/apache/beam/pull/7014#issuecomment-438054436 R: @chamikaramj @alanmyrvold This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165186) Time Spent: 13h 10m (was: 13h) > Define max global window as a shared value in protos like URN enums. > > > Key: BEAM-5299 > URL: https://issues.apache.org/jira/browse/BEAM-5299 > Project: Beam > Issue Type: Improvement > Components: beam-model, sdk-go, sdk-java-core, sdk-py-core >Reporter: Luke Cwik >Assignee: Maximilian Michels >Priority: Minor > Labels: portability > Fix For: 2.9.0 > > Time Spent: 13h 10m > Remaining Estimate: 0h > > Instead of having each language define a max timestamp themselves, define the > max timestamps within proto to be shared across different languages. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3608) Vendor Guava
[ https://issues.apache.org/jira/browse/BEAM-3608?focusedWorklogId=165184&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165184 ] ASF GitHub Bot logged work on BEAM-3608: Author: ASF GitHub Bot Created on: 12/Nov/18 22:42 Start Date: 12/Nov/18 22:42 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #6809: [BEAM-3608] Vendor guava 20.0 URL: https://github.com/apache/beam/pull/6809 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 18543ced40d..b59925d9aa9 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -270,34 +270,8 @@ class BeamModulePlugin implements Plugin { project.version += '-SNAPSHOT' } -project.repositories { - maven { url project.offlineRepositoryRoot } - - // To run gradle in offline mode, one must first invoke - // 'updateOfflineRepository' to create an offline repo - // inside the root project directory. See the application - // of the offline repo plugin within build_rules.gradle - // for further details. - if (project.gradle.startParameter.isOffline()) { -return - } - - mavenCentral() - mavenLocal() - jcenter() - - // Spring for resolving pentaho dependency. - maven { url "https://repo.spring.io/plugins-release/"; } - - // Release staging repository - maven { url "https://oss.sonatype.org/content/repositories/staging/"; } - - // Apache nightly snapshots - maven { url "https://repository.apache.org/snapshots"; } - - // Apache release snapshots - maven { url "https://repository.apache.org/content/repositories/releases"; } -} +// Register all Beam repositories and configuration tweaks +Repositories.register(project) // Apply a plugin which enables configuring projects imported into Intellij. project.apply plugin: "idea" @@ -613,28 +587,6 @@ class BeamModulePlugin implements Plugin { } project.artifacts.archives project.packageTests - // Apply a plugin which provides the 'updateOfflineRepository' task that creates an offline - // repository. This offline repository satisfies all Gradle build dependencies and Java - // project dependencies. The offline repository is placed within $rootDir/offline-repo - // but can be overridden by specifying '-PofflineRepositoryRoot=/path/to/repo'. - // Note that parallel build must be disabled when executing 'updateOfflineRepository' - // by specifying '--no-parallel', see - // https://github.com/mdietrichstein/gradle-offline-dependencies-plugin/issues/3 - project.apply plugin: "io.pry.gradle.offline_dependencies" - project.offlineDependencies { -repositories { - mavenLocal() - mavenCentral() - jcenter() - maven { url "https://plugins.gradle.org/m2/"; } - maven { url "http://repo.spring.io/plugins-release"; } - maven { url project.offlineRepositoryRoot } -} -includeSources = false -includeJavadocs = false -includeIvyXmls = false - } - // Configures annotation processing for commonly used annotation processors // across all Java projects. project.apply plugin: "net.ltgt.apt" diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/Repositories.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/Repositories.groovy new file mode 100644 index 000..8840d4c284a --- /dev/null +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/Repositories.groovy @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */
[jira] [Work logged] (BEAM-3608) Vendor Guava
[ https://issues.apache.org/jira/browse/BEAM-3608?focusedWorklogId=165183&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165183 ] ASF GitHub Bot logged work on BEAM-3608: Author: ASF GitHub Bot Created on: 12/Nov/18 22:41 Start Date: 12/Nov/18 22:41 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #6809: [BEAM-3608] Vendor guava 20.0 URL: https://github.com/apache/beam/pull/6809#issuecomment-438054066 Merging this for now, it seems as though we should include the pom.xml and pom.properties file even though they will have limited value. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165183) Time Spent: 5h 50m (was: 5h 40m) > Vendor Guava > > > Key: BEAM-3608 > URL: https://issues.apache.org/jira/browse/BEAM-3608 > Project: Beam > Issue Type: Sub-task > Components: runner-core, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > Instead of shading as part of our build, we can shade before build so that it > is apparent when reading code, and in IDEs, that a particular class resides > in a hidden namespace. > {{import com.google.common.reflect.TypeToken}} > becomes something like > {{import org.apache.beam.private.guava21.com.google.common.reflect.TypeToken}} > So we can very trivially ban `org.apache.beam.private` from public APIs > unless they are annotated {{@Internal}}, and it makes sharing between our own > modules never get broken by shading again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5299) Define max global window as a shared value in protos like URN enums.
[ https://issues.apache.org/jira/browse/BEAM-5299?focusedWorklogId=165182&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165182 ] ASF GitHub Bot logged work on BEAM-5299: Author: ASF GitHub Bot Created on: 12/Nov/18 22:41 Start Date: 12/Nov/18 22:41 Worklog Time Spent: 10m Work Description: HuangLED opened a new pull request #7014: [BEAM-5299] Update worker container version to most recent release. URL: https://github.com/apache/beam/pull/7014 BEAM-5299 and BEAM-5931. Update worker container version (newly released 20181112). Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165182) Time Spent: 13h (was: 12h 50m) > Define max global window as a shared value in protos like UR
[jira] [Work logged] (BEAM-6047) hdfsIntegrationTest is failing due to DisallowedDatanodeException
[ https://issues.apache.org/jira/browse/BEAM-6047?focusedWorklogId=165181&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165181 ] ASF GitHub Bot logged work on BEAM-6047: Author: ASF GitHub Bot Created on: 12/Nov/18 22:28 Start Date: 12/Nov/18 22:28 Worklog Time Spent: 10m Work Description: udim commented on issue #7013: [BEAM-6047] DO NOT MERGE: listing containers URL: https://github.com/apache/beam/pull/7013#issuecomment-438050468 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165181) Time Spent: 0.5h (was: 20m) > hdfsIntegrationTest is failing due to DisallowedDatanodeException > - > > Key: BEAM-6047 > URL: https://issues.apache.org/jira/browse/BEAM-6047 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Chamikara Jayalath >Assignee: Udi Meiri >Priority: Critical > Time Spent: 0.5h > Remaining Estimate: 0h > > beam_PostCommit_Python_Verify is perma red due to this. > > [https://scans.gradle.com/s/pqg3ent77c2pu/console-log?task=:beam-sdks-python:hdfsIntegrationTest#L1653] > > [36mnamenode_1_d5b2d221b2e2 |[0m > org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException: Datanode > denied communication with > namenode because hostname cannot be resolved (ip=172.18.0.3, > hostname=172.18.0.3): DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=717d9e8f-ff6b-449c-bc94-6deb57455f93, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-ef5ccec9-8245-45cc-9cc1-b103684e8ce1;nsid=1522131886;c=1542034438259) > > > [33mdatanode_1_3ee1bc415f49 |[0m 18/11/10 18:03:06 ERROR datanode.DataNode: > Initialization failed for Block pool BP-15607 > 62326-172.18.0.2-1541872973618 (Datanode Uuid > 367ac165-40b3-437f-97dc-567cb8d15da5) service to namenode/172.18.0.2:8020 > Datanode denied communication with namenode because hostname cannot be > resolved (ip=172.18.0.3, hostname=172.18.0.3): > DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=367ac165-40b3-437f-97dc-567cb8d15da5, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-d00d4259-9d80-4952-85d1-a0012d91842d;nsid=1271723208;c=1541872973618) > > Udi, any idea ? > > cc: [~swegner] [~altay] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6047) hdfsIntegrationTest is failing due to DisallowedDatanodeException
[ https://issues.apache.org/jira/browse/BEAM-6047?focusedWorklogId=165178&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165178 ] ASF GitHub Bot logged work on BEAM-6047: Author: ASF GitHub Bot Created on: 12/Nov/18 22:23 Start Date: 12/Nov/18 22:23 Worklog Time Spent: 10m Work Description: udim commented on issue #7013: [BEAM-6047] DO NOT MERGE: listing containers URL: https://github.com/apache/beam/pull/7013#issuecomment-438049202 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165178) Time Spent: 20m (was: 10m) > hdfsIntegrationTest is failing due to DisallowedDatanodeException > - > > Key: BEAM-6047 > URL: https://issues.apache.org/jira/browse/BEAM-6047 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Chamikara Jayalath >Assignee: Udi Meiri >Priority: Critical > Time Spent: 20m > Remaining Estimate: 0h > > beam_PostCommit_Python_Verify is perma red due to this. > > [https://scans.gradle.com/s/pqg3ent77c2pu/console-log?task=:beam-sdks-python:hdfsIntegrationTest#L1653] > > [36mnamenode_1_d5b2d221b2e2 |[0m > org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException: Datanode > denied communication with > namenode because hostname cannot be resolved (ip=172.18.0.3, > hostname=172.18.0.3): DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=717d9e8f-ff6b-449c-bc94-6deb57455f93, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-ef5ccec9-8245-45cc-9cc1-b103684e8ce1;nsid=1522131886;c=1542034438259) > > > [33mdatanode_1_3ee1bc415f49 |[0m 18/11/10 18:03:06 ERROR datanode.DataNode: > Initialization failed for Block pool BP-15607 > 62326-172.18.0.2-1541872973618 (Datanode Uuid > 367ac165-40b3-437f-97dc-567cb8d15da5) service to namenode/172.18.0.2:8020 > Datanode denied communication with namenode because hostname cannot be > resolved (ip=172.18.0.3, hostname=172.18.0.3): > DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=367ac165-40b3-437f-97dc-567cb8d15da5, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-d00d4259-9d80-4952-85d1-a0012d91842d;nsid=1271723208;c=1541872973618) > > Udi, any idea ? > > cc: [~swegner] [~altay] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6037) Make Spark runner pipeline translation based on URNs
[ https://issues.apache.org/jira/browse/BEAM-6037?focusedWorklogId=165173&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165173 ] ASF GitHub Bot logged work on BEAM-6037: Author: ASF GitHub Bot Created on: 12/Nov/18 22:09 Start Date: 12/Nov/18 22:09 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #7005: [BEAM-6037] Make Spark runner pipeline translation based on URNs URL: https://github.com/apache/beam/pull/7005 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index bf4ffbd8312..86a414c4c50 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -38,6 +38,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms; +import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.CombineComponents; import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.SplittableParDoComponents; import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslator; import org.apache.beam.sdk.Pipeline; @@ -88,6 +89,8 @@ getUrn(StandardPTransforms.Composites.COMBINE_PER_KEY); public static final String COMBINE_GLOBALLY_TRANSFORM_URN = getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY); + public static final String COMBINE_GROUPED_VALUES_TRANSFORM_URN = + getUrn(CombineComponents.COMBINE_GROUPED_VALUES); public static final String RESHUFFLE_URN = getUrn(StandardPTransforms.Composites.RESHUFFLE); public static final String WRITE_FILES_TRANSFORM_URN = getUrn(StandardPTransforms.Composites.WRITE_FILES); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java index 8e2f917bb82..4662d81cf98 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java @@ -91,9 +91,7 @@ private boolean shouldDebug(final TransformHierarchy.Node node) { @SuppressWarnings("unchecked") TransformT transform = (TransformT) node.getTransform(); @SuppressWarnings("unchecked") -Class transformClass = (Class) transform.getClass(); -@SuppressWarnings("unchecked") -TransformEvaluator evaluator = translate(node, transform, transformClass); +TransformEvaluator evaluator = translate(node, transform); if (shouldDebug(node)) { transforms.add(new NativeTransform(node, evaluator, transform, false)); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index ca86bde5136..1eeb1c1c291 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -374,13 +374,11 @@ public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) { @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - if (node.getTransform() != null) { -@SuppressWarnings("unchecked") -Class> transformClass = -(Class>) node.getTransform().getClass(); -if (translator.hasTranslation(transformClass) && !shouldDefer(node)) { + PTransform transform = node.getTransform(); + if (transform != null) { +if (translator.hasTranslation(transform) && !shouldDefer(node)) { LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName()); - LOG.debug("Composite transform class: '{}'", transformClass); + LOG.debug("Composite transform class: '{}'", transform); doVisitTransform(node); return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; } @@ -433,9 +431,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { @SuppressWarnings("unchecked") TransformT transform = (TransformT) node.getTransform(); @SuppressWarnings
[jira] [Work logged] (BEAM-6047) hdfsIntegrationTest is failing due to DisallowedDatanodeException
[ https://issues.apache.org/jira/browse/BEAM-6047?focusedWorklogId=165174&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165174 ] ASF GitHub Bot logged work on BEAM-6047: Author: ASF GitHub Bot Created on: 12/Nov/18 22:09 Start Date: 12/Nov/18 22:09 Worklog Time Spent: 10m Work Description: udim commented on issue #7013: [BEAM-6047] DO NOT MERGE: listing containers URL: https://github.com/apache/beam/pull/7013#issuecomment-438045826 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165174) Time Spent: 10m Remaining Estimate: 0h > hdfsIntegrationTest is failing due to DisallowedDatanodeException > - > > Key: BEAM-6047 > URL: https://issues.apache.org/jira/browse/BEAM-6047 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures >Reporter: Chamikara Jayalath >Assignee: Udi Meiri >Priority: Critical > Time Spent: 10m > Remaining Estimate: 0h > > beam_PostCommit_Python_Verify is perma red due to this. > > [https://scans.gradle.com/s/pqg3ent77c2pu/console-log?task=:beam-sdks-python:hdfsIntegrationTest#L1653] > > [36mnamenode_1_d5b2d221b2e2 |[0m > org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException: Datanode > denied communication with > namenode because hostname cannot be resolved (ip=172.18.0.3, > hostname=172.18.0.3): DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=717d9e8f-ff6b-449c-bc94-6deb57455f93, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-ef5ccec9-8245-45cc-9cc1-b103684e8ce1;nsid=1522131886;c=1542034438259) > > > [33mdatanode_1_3ee1bc415f49 |[0m 18/11/10 18:03:06 ERROR datanode.DataNode: > Initialization failed for Block pool BP-15607 > 62326-172.18.0.2-1541872973618 (Datanode Uuid > 367ac165-40b3-437f-97dc-567cb8d15da5) service to namenode/172.18.0.2:8020 > Datanode denied communication with namenode because hostname cannot be > resolved (ip=172.18.0.3, hostname=172.18.0.3): > DatanodeRegistration(0.0.0.0:50010, > datanodeUuid=367ac165-40b3-437f-97dc-567cb8d15da5, infoPort=50075, > infoSecurePort=0, ipcPort=50020, > storageInfo=lv=-57;cid=CID-d00d4259-9d80-4952-85d1-a0012d91842d;nsid=1271723208;c=1541872973618) > > Udi, any idea ? > > cc: [~swegner] [~altay] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165161&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165161 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:44 Start Date: 12/Nov/18 21:44 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232820429 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { +boolean allDone = true; +for (InboundDataClient idc : idcs) { + allDone &= idc.isDone(); +} +return allDone; + } + + /** + * Drains the internal queue of this class, by waiting for all WindowValues to be passed to thier + * consumers. The thread which wishes to process() the elements should call this method, as this + * will cause the consumers to invoke element processing. All receive() and send() calls must be + * made prior to calling drainAndBlock, in order to properly terminate. + */ + public void drainAndBlock() throws Exception { +// Note: We just throw the exception here +// TODO review the error handling here +while (true) { + ConsumerAndData tuple = null; + tuple = queue.poll(50, TimeUnit.MILLISECONDS); + // TODO should we i
[jira] [Work logged] (BEAM-5446) SplittableDoFn: Remove runner time execution information from public API surface
[ https://issues.apache.org/jira/browse/BEAM-5446?focusedWorklogId=165172&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165172 ] ASF GitHub Bot logged work on BEAM-5446: Author: ASF GitHub Bot Created on: 12/Nov/18 22:07 Start Date: 12/Nov/18 22:07 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #6467: [BEAM-5446] SplittableDoFn: Remove "internal" methods for public API surface URL: https://github.com/apache/beam/pull/6467 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index 7f4c95182d5..85dce13e133 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; @@ -126,13 +125,12 @@ public void translate(ParDo.MultiOutput transform, TranslationC } } - static class SplittableProcessElementsTranslator< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> - implements TransformTranslator> { + static class SplittableProcessElementsTranslator + implements TransformTranslator> { @Override public void translate( -ProcessElements transform, +ProcessElements transform, TranslationContext context) { Map, PValue> outputs = context.getOutputs(); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java index 3f361405c5e..261e86f23cc 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java @@ -109,8 +109,7 @@ public PCollectionTuple expand(PCollection>> } } - static class NaiveProcessFn< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker> + static class NaiveProcessFn extends DoFn, OutputT> { private final DoFn fn; @@ -142,7 +141,7 @@ public void process(ProcessContext c, BoundedWindow w) { InputT element = c.element().getKey(); RestrictionT restriction = c.element().getValue(); while (true) { -TrackerT tracker = invoker.invokeNewTracker(restriction); +RestrictionTracker tracker = invoker.invokeNewTracker(restriction); ProcessContinuation continuation = invoker.invokeProcessElement(new NestedProcessContext<>(fn, c, element, w, tracker)); if (continuation.shouldResume()) { diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 86dbfbfdb84..7155efb1555 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -163,7 +163,8 @@ public void simpleProcess(ProcessContext ctxt) { private DoFn, Integer> splittableDoFn = new DoFn, Integer>() { @ProcessElement -public void processElement(ProcessContext context, SomeTracker tracker) {} +public void processElement( +ProcessContext context, RestrictionTracker tracker) {} @GetInitialRestriction public Void getInitialRestriction(KV element) { diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java index 68365c85bc9..959120c7d54 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/Spli
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165156&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165156 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:44 Start Date: 12/Nov/18 21:44 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232822073 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { +boolean allDone = true; +for (InboundDataClient idc : idcs) { + allDone &= idc.isDone(); +} +return allDone; + } + + /** + * Drains the internal queue of this class, by waiting for all WindowValues to be passed to thier + * consumers. The thread which wishes to process() the elements should call this method, as this + * will cause the consumers to invoke element processing. All receive() and send() calls must be + * made prior to calling drainAndBlock, in order to properly terminate. + */ + public void drainAndBlock() throws Exception { +// Note: We just throw the exception here +// TODO review the error handling here +while (true) { + ConsumerAndData tuple = null; + tuple = queue.poll(50, TimeUnit.MILLISECONDS); + // TODO should we i
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165157&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165157 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:44 Start Date: 12/Nov/18 21:44 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232821492 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { +boolean allDone = true; +for (InboundDataClient idc : idcs) { + allDone &= idc.isDone(); +} +return allDone; + } + + /** + * Drains the internal queue of this class, by waiting for all WindowValues to be passed to thier + * consumers. The thread which wishes to process() the elements should call this method, as this + * will cause the consumers to invoke element processing. All receive() and send() calls must be + * made prior to calling drainAndBlock, in order to properly terminate. + */ + public void drainAndBlock() throws Exception { +// Note: We just throw the exception here +// TODO review the error handling here +while (true) { + ConsumerAndData tuple = null; + tuple = queue.poll(50, TimeUnit.MILLISECONDS); + // TODO should we i
[jira] [Work logged] (BEAM-5953) Support DataflowRunner on Python 3
[ https://issues.apache.org/jira/browse/BEAM-5953?focusedWorklogId=165151&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165151 ] ASF GitHub Bot logged work on BEAM-5953: Author: ASF GitHub Bot Created on: 12/Nov/18 21:41 Start Date: 12/Nov/18 21:41 Worklog Time Spent: 10m Work Description: aaltay closed pull request #7011: [BEAM-5953] Fix DataflowRunner in Python 3 - type errors URL: https://github.com/apache/beam/pull/7011 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py index ec85c288731..d4c4bfe7641 100644 --- a/sdks/python/apache_beam/internal/gcp/json_value.py +++ b/sdks/python/apache_beam/internal/gcp/json_value.py @@ -107,6 +107,8 @@ def to_json_value(obj, with_type=False): return to_json_value(get_typed_value_descriptor(obj), with_type=False) elif isinstance(obj, (str, unicode)): return extra_types.JsonValue(string_value=obj) + elif isinstance(obj, bytes): +return extra_types.JsonValue(string_value=obj.decode('utf8')) elif isinstance(obj, bool): return extra_types.JsonValue(boolean_value=obj) elif isinstance(obj, (int, long)): diff --git a/sdks/python/apache_beam/internal/gcp/json_value_test.py b/sdks/python/apache_beam/internal/gcp/json_value_test.py index e7cf7f15e36..e6d064a92d5 100644 --- a/sdks/python/apache_beam/internal/gcp/json_value_test.py +++ b/sdks/python/apache_beam/internal/gcp/json_value_test.py @@ -41,6 +41,9 @@ class JsonValueTest(unittest.TestCase): def test_string_to(self): self.assertEquals(JsonValue(string_value='abc'), to_json_value('abc')) + def test_bytes_to(self): +self.assertEquals(JsonValue(string_value='abc'), to_json_value(b'abc')) + def test_true_to(self): self.assertEquals(JsonValue(boolean_value=True), to_json_value(True)) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 30f466b40ef..a135251fa8e 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -175,7 +175,7 @@ def rank_error(msg): for m in messages: message = '%s: %s: %s' % (m.time, m.messageImportance, m.messageText) - if m.time > last_message_time: + if not last_message_time or m.time > last_message_time: last_message_time = m.time current_seen_messages = set() @@ -522,7 +522,7 @@ def run_Impulse(self, transform_node): encoded_impulse_element = coders.WindowedValueCoder( coders.BytesCoder(), coders.coders.GlobalWindowCoder()).get_impl().encode_nested( - window.GlobalWindows.windowed_value('')) + window.GlobalWindows.windowed_value(b'')) step.add_property(PropertyNames.IMPULSE_ELEMENT, self.byte_array_to_json_string(encoded_impulse_element)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165151) Time Spent: 1h 10m (was: 1h) > Support DataflowRunner on Python 3 > -- > > Key: BEAM-5953 > URL: https://issues.apache.org/jira/browse/BEAM-5953 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Mark Liu >Assignee: Mark Liu >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165162&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165162 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:45 Start Date: 12/Nov/18 21:45 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232822746 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { Review comment: note: Java method convention is for this method to be called `allDone` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165162) Time Spent: 6h (was: 5h 50m) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam >
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165155&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165155 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:44 Start Date: 12/Nov/18 21:44 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232818685 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { +boolean allDone = true; +for (InboundDataClient idc : idcs) { + allDone &= idc.isDone(); +} +return allDone; + } + + /** + * Drains the internal queue of this class, by waiting for all WindowValues to be passed to thier + * consumers. The thread which wishes to process() the elements should call this method, as this + * will cause the consumers to invoke element processing. All receive() and send() calls must be + * made prior to calling drainAndBlock, in order to properly terminate. + */ + public void drainAndBlock() throws Exception { +// Note: We just throw the exception here +// TODO review the error handling here +while (true) { + ConsumerAndData tuple = null; + tuple = queue.poll(50, TimeUnit.MILLISECONDS); + // TODO should we i
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165154&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165154 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:44 Start Date: 12/Nov/18 21:44 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232817746 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { Review comment: nit: Rename to QueueingBeamFnDataClient (gRPC or not doesn't matter). optional: Make this an internal static class within ProcessBundleHandler? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165154) Time Spent: 4h 40m (was: 4.5h) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165158&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165158 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:44 Start Date: 12/Nov/18 21:44 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232818309 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); Review comment: nit: `newConsumer` -> `queueingConsumer` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165158) Time Spent: 5h 20m (was: 5h 10m) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > Update BeamFnDataReceive
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165160&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165160 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:44 Start Date: 12/Nov/18 21:44 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232821394 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). +this.idcs = new HashSet(); + } + + /** + * Registers the following inbound stream consumer for the provided instruction id and target. + * + * The provided coder is used to decode elements on the inbound stream. The decoded elements + * are passed to the provided consumer. Any failure during decoding or processing of the element + * will complete the returned future exceptionally. On successful termination of the stream + * (signaled by an empty data block), the returned future is completed successfully. + */ + @Override + public InboundDataClient receive( + ApiServiceDescriptor apiServiceDescriptor, + LogicalEndpoint inputLocation, + Coder> coder, + FnDataReceiver> consumer) { +LOG.debug( +"Registering consumer for instruction {} and target {}", +inputLocation.getInstructionId(), +inputLocation.getTarget()); + +QueueingFnDataReceiver newConsumer = new QueueingFnDataReceiver(consumer); +InboundDataClient idc = +this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, newConsumer); +newConsumer.idc = idc; +this.idcs.add(idc); +return idc; + } + + private boolean AllDone() { +boolean allDone = true; +for (InboundDataClient idc : idcs) { + allDone &= idc.isDone(); +} +return allDone; + } + + /** + * Drains the internal queue of this class, by waiting for all WindowValues to be passed to thier + * consumers. The thread which wishes to process() the elements should call this method, as this + * will cause the consumers to invoke element processing. All receive() and send() calls must be + * made prior to calling drainAndBlock, in order to properly terminate. + */ + public void drainAndBlock() throws Exception { +// Note: We just throw the exception here +// TODO review the error handling here +while (true) { + ConsumerAndData tuple = null; + tuple = queue.poll(50, TimeUnit.MILLISECONDS); + // TODO should we i
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165159&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165159 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:44 Start Date: 12/Nov/18 21:44 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232817439 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ## @@ -227,6 +229,8 @@ private void createRunnerAndConsumersForPTransformRecursively( ProcessBundleResponse.Builder response = ProcessBundleResponse.newBuilder(); +boolean hasSinkPtransform = false; Review comment: `hasSinkPtransform` -> `hasReadPTransform` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165159) Time Spent: 5.5h (was: 5h 20m) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165153&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165153 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:44 Start Date: 12/Nov/18 21:44 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232818108 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueuingBeamFnDataGrpcClient.java ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.data; + +import java.util.HashSet; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.InboundDataClient; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link BeamFnDataClient} that queues elements so that they can be consumed and processed. In + * the thread which calls drainAndBlock. + */ +public class QueuingBeamFnDataGrpcClient implements BeamFnDataClient { + + private static final Logger LOG = LoggerFactory.getLogger(QueuingBeamFnDataGrpcClient.class); + + private final BeamFnDataClient mainClient; + private final SynchronousQueue queue; + private final HashSet idcs; + + public QueuingBeamFnDataGrpcClient(BeamFnDataClient mainClient) { +this.mainClient = mainClient; +this.queue = new SynchronousQueue<>(); +// TODO does this need to be a concurrent hash map (set doesn't seem to exist). Review comment: Check out ConcurrentHashMap.newKeySet() This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165153) Time Spent: 4.5h (was: 4h 20m) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=165152&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165152 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 12/Nov/18 21:41 Start Date: 12/Nov/18 21:41 Worklog Time Spent: 10m Work Description: ihji commented on issue #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#issuecomment-438038046 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165152) Time Spent: 8h 40m (was: 8.5h) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 8h 40m > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4444) Parquet IO for Python SDK
[ https://issues.apache.org/jira/browse/BEAM-?focusedWorklogId=165148&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165148 ] ASF GitHub Bot logged work on BEAM-: Author: ASF GitHub Bot Created on: 12/Nov/18 21:40 Start Date: 12/Nov/18 21:40 Worklog Time Spent: 10m Work Description: ihji removed a comment on issue #6763: [BEAM-] Parquet IO for Python SDK URL: https://github.com/apache/beam/pull/6763#issuecomment-437997231 run python postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165148) Time Spent: 8.5h (was: 8h 20m) > Parquet IO for Python SDK > - > > Key: BEAM- > URL: https://issues.apache.org/jira/browse/BEAM- > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Bruce Arctor >Assignee: Heejong Lee >Priority: Major > Time Spent: 8.5h > Remaining Estimate: 0h > > Add Parquet Support for the Python SDK. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165141&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165141 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:09 Start Date: 12/Nov/18 21:09 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232812251 ## File path: .gitignore ## @@ -14,6 +14,13 @@ sdks/**/vendor/**/* runners/**/vendor/**/* **/.gradletasknamecache +# Ignore gradle files generated by Intellij Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165141) Time Spent: 3h 50m (was: 3h 40m) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165144&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165144 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:12 Start Date: 12/Nov/18 21:12 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232813139 ## File path: sdks/java/gradle/wrapper/gradle-wrapper.properties ## @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME Review comment: obsolete This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165144) Time Spent: 4h 20m (was: 4h 10m) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165143&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165143 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:10 Start Date: 12/Nov/18 21:10 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232812669 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java ## @@ -123,43 +141,142 @@ public void close() { * it is ready to consume that data. */ private final class InboundObserver implements StreamObserver { + +private final ConcurrentHashMap> instructionIdToQueue; +private final ConcurrentHashMap>> instructionIdToConsumers; + +public InboundObserver(boolean enableQueuing) { + this.instructionIdToQueue = + enableQueuing ? new ConcurrentHashMap>() : null; + this.instructionIdToConsumers = new ConcurrentHashMap>>(); +} + +private void registerConsumer( +String instructionId, Consumer consumer) { + this.instructionIdToConsumers.putIfAbsent(instructionId, new HashSet>()); + this.instructionIdToConsumers.get(instructionId).add(consumer); +} + +private boolean isFinalizationData(BeamFnApi.Elements.Data data) { + return data.getData().isEmpty(); +} + +// TODO overall error handling needs an audit and possibly we need to be creative about +// testing here. Maybe handleData should just throw all exceptions, and catch them +// and call onError in the calling code? +private void handleData(BeamFnApi.Elements.Data data) { + try { +LogicalEndpoint key = LogicalEndpoint.of(data.getInstructionReference(), data.getTarget()); +CompletableFuture> consumer = receiverFuture(key); +if (!consumer.isDone()) { + LOG.debug( + "Received data for key {} without consumer ready. " + + "Waiting for consumer to be registered.", + key); +} +consumer.get().accept(data); +if (isFinalizationData(data)) { + consumers.remove(key); +} +/* + * TODO: On failure we should fail any bundles that were impacted eagerly + * instead of relying on the Runner harness to do all the failure handling. + */ + } catch (ExecutionException | InterruptedException e) { +LOG.error( +"Client interrupted during handling of data for instruction {} and target {}", +data.getInstructionReference(), +data.getTarget(), +e); +outboundObserver.onError(e); + } catch (RuntimeException e) { +LOG.error( +"Client failed to handle data for instruction {} and target {}", +data.getInstructionReference(), +data.getTarget(), +e); +outboundObserver.onError(e); + } +} + +private boolean IsQueueingEnabled() { + return instructionIdToQueue != null; +} + + +// TODO please help reveiw the concurrency aspect here. Questions: +// - Safe use of ConcurrentHashMap? +// - Unnecessary use of ConcurrentHashMap? +// - Is there some code that can run concurrently now, which was not before? @Override public void onNext(BeamFnApi.Elements value) { for (BeamFnApi.Elements.Data data : value.getDataList()) { try { - LogicalEndpoint key = - LogicalEndpoint.of(data.getInstructionReference(), data.getTarget()); - CompletableFuture> consumer = receiverFuture(key); - if (!consumer.isDone()) { -LOG.debug( -"Received data for key {} without consumer ready. " -+ "Waiting for consumer to be registered.", -key); - } - consumer.get().accept(data); - if (data.getData().isEmpty()) { -consumers.remove(key); + if (IsQueueingEnabled()) { +// TODO Is there a cleaner way do to this? +// I was trying to avoid always doing multiple lookups, and always creating +// a new instance on the heap +SynchronousQueue queue = instructionIdToQueue.get(data.getInstructionReference()); +if (queue == null) { + instructionIdToQueue.putIfAbsent( + data.getInstructionReference(), new SynchronousQueue()); + // Lookup the queue again incase there was a race and another thread added one. + queue = instructionIdToQueue.get(data.getInstructionReference()); +} + +// T
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165142&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165142 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:09 Start Date: 12/Nov/18 21:09 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232812318 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/EnqueuingFnDataReceiver.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.data; + +import java.util.concurrent.SynchronousQueue; +import org.apache.beam.sdk.fn.data.FnDataReceiver; + + +/** + * A {@link FnDataReceiver} which forwards all received inputs to a collection of {@link + * FnDataReceiver receivers}. + */ +public class EnqueuingFnDataReceiver implements FnDataReceiver { + + private final SynchronousQueue queue; Review comment: obsolete This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165142) Time Spent: 4h (was: 3h 50m) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165140&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165140 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:08 Start Date: 12/Nov/18 21:08 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232812219 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java ## @@ -87,7 +91,7 @@ Map pCollections, Map coders, Map windowingStrategies, -ListMultimap>> pCollectionIdsToConsumers, +ListMultimap>> pCollectionIdsToConsumers, //FnDataReceiver has PcollectionSize and elem count. ajamato Review comment: rm'd changes to this file This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165140) Time Spent: 3h 40m (was: 3.5h) > Make process, finish and start run on the same thread to support metrics. > - > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution >Reporter: Alex Amato >Assignee: Alex Amato >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165139&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165139 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:08 Start Date: 12/Nov/18 21:08 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232812125 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java ## @@ -123,43 +141,142 @@ public void close() { * it is ready to consume that data. */ private final class InboundObserver implements StreamObserver { + +private final ConcurrentHashMap> instructionIdToQueue; +private final ConcurrentHashMap>> instructionIdToConsumers; + +public InboundObserver(boolean enableQueuing) { + this.instructionIdToQueue = + enableQueuing ? new ConcurrentHashMap>() : null; + this.instructionIdToConsumers = new ConcurrentHashMap>>(); +} + +private void registerConsumer( +String instructionId, Consumer consumer) { + this.instructionIdToConsumers.putIfAbsent(instructionId, new HashSet>()); + this.instructionIdToConsumers.get(instructionId).add(consumer); +} + +private boolean isFinalizationData(BeamFnApi.Elements.Data data) { + return data.getData().isEmpty(); +} + +// TODO overall error handling needs an audit and possibly we need to be creative about +// testing here. Maybe handleData should just throw all exceptions, and catch them +// and call onError in the calling code? +private void handleData(BeamFnApi.Elements.Data data) { + try { +LogicalEndpoint key = LogicalEndpoint.of(data.getInstructionReference(), data.getTarget()); +CompletableFuture> consumer = receiverFuture(key); +if (!consumer.isDone()) { + LOG.debug( + "Received data for key {} without consumer ready. " + + "Waiting for consumer to be registered.", + key); +} +consumer.get().accept(data); +if (isFinalizationData(data)) { + consumers.remove(key); +} +/* + * TODO: On failure we should fail any bundles that were impacted eagerly + * instead of relying on the Runner harness to do all the failure handling. + */ + } catch (ExecutionException | InterruptedException e) { +LOG.error( +"Client interrupted during handling of data for instruction {} and target {}", +data.getInstructionReference(), +data.getTarget(), +e); +outboundObserver.onError(e); + } catch (RuntimeException e) { +LOG.error( +"Client failed to handle data for instruction {} and target {}", +data.getInstructionReference(), +data.getTarget(), +e); +outboundObserver.onError(e); + } +} + +private boolean IsQueueingEnabled() { + return instructionIdToQueue != null; +} + + +// TODO please help reveiw the concurrency aspect here. Questions: +// - Safe use of ConcurrentHashMap? +// - Unnecessary use of ConcurrentHashMap? +// - Is there some code that can run concurrently now, which was not before? @Override public void onNext(BeamFnApi.Elements value) { for (BeamFnApi.Elements.Data data : value.getDataList()) { try { - LogicalEndpoint key = - LogicalEndpoint.of(data.getInstructionReference(), data.getTarget()); - CompletableFuture> consumer = receiverFuture(key); - if (!consumer.isDone()) { -LOG.debug( -"Received data for key {} without consumer ready. " -+ "Waiting for consumer to be registered.", -key); - } - consumer.get().accept(data); - if (data.getData().isEmpty()) { -consumers.remove(key); + if (IsQueueingEnabled()) { +// TODO Is there a cleaner way do to this? +// I was trying to avoid always doing multiple lookups, and always creating +// a new instance on the heap +SynchronousQueue queue = instructionIdToQueue.get(data.getInstructionReference()); +if (queue == null) { + instructionIdToQueue.putIfAbsent( + data.getInstructionReference(), new SynchronousQueue()); + // Lookup the queue again incase there was a race and another thread added one. + queue = instructionIdToQueue.get(data.getInstructionReference()); +} + +// T
[jira] [Work logged] (BEAM-5850) Make process, finish and start run on the same thread to support metrics.
[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165138&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165138 ] ASF GitHub Bot logged work on BEAM-5850: Author: ASF GitHub Bot Created on: 12/Nov/18 21:08 Start Date: 12/Nov/18 21:08 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232811990 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java ## @@ -123,43 +141,142 @@ public void close() { * it is ready to consume that data. */ private final class InboundObserver implements StreamObserver { + +private final ConcurrentHashMap> instructionIdToQueue; +private final ConcurrentHashMap>> instructionIdToConsumers; + +public InboundObserver(boolean enableQueuing) { + this.instructionIdToQueue = + enableQueuing ? new ConcurrentHashMap>() : null; + this.instructionIdToConsumers = new ConcurrentHashMap>>(); +} + +private void registerConsumer( +String instructionId, Consumer consumer) { + this.instructionIdToConsumers.putIfAbsent(instructionId, new HashSet>()); + this.instructionIdToConsumers.get(instructionId).add(consumer); +} + +private boolean isFinalizationData(BeamFnApi.Elements.Data data) { + return data.getData().isEmpty(); +} + +// TODO overall error handling needs an audit and possibly we need to be creative about +// testing here. Maybe handleData should just throw all exceptions, and catch them +// and call onError in the calling code? +private void handleData(BeamFnApi.Elements.Data data) { + try { +LogicalEndpoint key = LogicalEndpoint.of(data.getInstructionReference(), data.getTarget()); +CompletableFuture> consumer = receiverFuture(key); +if (!consumer.isDone()) { + LOG.debug( + "Received data for key {} without consumer ready. " + + "Waiting for consumer to be registered.", + key); +} +consumer.get().accept(data); +if (isFinalizationData(data)) { + consumers.remove(key); +} +/* + * TODO: On failure we should fail any bundles that were impacted eagerly + * instead of relying on the Runner harness to do all the failure handling. + */ + } catch (ExecutionException | InterruptedException e) { +LOG.error( +"Client interrupted during handling of data for instruction {} and target {}", +data.getInstructionReference(), +data.getTarget(), +e); +outboundObserver.onError(e); + } catch (RuntimeException e) { +LOG.error( +"Client failed to handle data for instruction {} and target {}", +data.getInstructionReference(), +data.getTarget(), +e); +outboundObserver.onError(e); + } +} + +private boolean IsQueueingEnabled() { + return instructionIdToQueue != null; +} + + +// TODO please help reveiw the concurrency aspect here. Questions: +// - Safe use of ConcurrentHashMap? +// - Unnecessary use of ConcurrentHashMap? +// - Is there some code that can run concurrently now, which was not before? @Override public void onNext(BeamFnApi.Elements value) { for (BeamFnApi.Elements.Data data : value.getDataList()) { try { - LogicalEndpoint key = - LogicalEndpoint.of(data.getInstructionReference(), data.getTarget()); - CompletableFuture> consumer = receiverFuture(key); - if (!consumer.isDone()) { -LOG.debug( -"Received data for key {} without consumer ready. " -+ "Waiting for consumer to be registered.", -key); - } - consumer.get().accept(data); - if (data.getData().isEmpty()) { -consumers.remove(key); + if (IsQueueingEnabled()) { +// TODO Is there a cleaner way do to this? +// I was trying to avoid always doing multiple lookups, and always creating +// a new instance on the heap +SynchronousQueue queue = instructionIdToQueue.get(data.getInstructionReference()); +if (queue == null) { + instructionIdToQueue.putIfAbsent( + data.getInstructionReference(), new SynchronousQueue()); + // Lookup the queue again incase there was a race and another thread added one. + queue = instructionIdToQueue.get(data.getInstructionReference()); +} + +// T