Re: Transform-specific thread pools in Python

2021-05-11 Thread Stephan Hoyer
On Mon, May 10, 2021 at 4:28 PM Ahmet Altay  wrote:

>
>
> On Mon, May 10, 2021 at 8:01 AM Stephan Hoyer  wrote:
>
>> Hi Beam devs,
>>
>> I've been exploring recently how to optimize IO bound steps for my Python
>> Beam pipelines, and have come up with a solution that I think might make
>> sense to upstream into Beam's Python SDK.
>>
>> It appears that Beam runners (at least the Cloud Dataflow runner)
>> typically use only a single thread per Python process.
>>
>
> I thought the default was not 1 but something else (12?). Maybe that
> changed.
>

>
>> The number of threads per worker can be adjusted with flags, but only for
>> the entire pipeline. This behavior makes sense *in general* under the
>> worst-case assumption that user-code in Python is CPU bound and requires
>> the GIL.
>>
>> However, multiple threads can be quite helpful in many cases, e.g.,
>> 1. CPU bound tasks that release the GIL. This is typically the case when
>> using libraries for numerical computing, such as NumPy and pandas.
>> 2. IO bound tasks that can be run asynchronously, e.g., reading/writing
>> files or RPCs. This is the use-case for which not using threads can be most
>> problematic, e.g., in a recent dataflow pipeline reading/writing lots of
>> relatively small files (~1-10 MB) to cloud storage with the default number
>> of threads per worker, I found that I was only using ~20% of available CPU.
>>
>> Because the optimal number of threads for Python code can be quite
>> heterogeneous, I would like to be able to indicate that particular steps of
>> my Beam pipelines should be executed using more threads. This would be
>> particularly valuable for writing libraries of custom IO transforms, which
>> should still conservatively assume that *other* steps in user provided
>> pipelines may be CPU bound.
>>
>> The solution I've come up with is to use beam.BatchElements with a ParDo
>> function that executes tasks in separate threads (via
>> concurrent.futures.ThreadPool). I've used this to make high-level wrappers
>> like beam.Map, beam.MapTuple, etc that execute with multiple threads. This
>> seems to work pretty well for my use-cases. I can put these in my own
>> library, of course, but perhaps these would make sense upstream into Beam's
>> Python SDK itself?
>>
>
> I believe a related idea (async pardo) was discussed and some work was
> done earlier (https://issues.apache.org/jira/browse/BEAM-6550). AFAIK
> Flink also has a similar concept (
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/asyncio/)
> as well.
>
> Perhaps you can share a bit more details about your proposal along with
> your code and people could provide more feedback on that.
>
>
Yes, async ParDo does look like the same thing! This sounds like exactly
the same use-case -- a ParDo for IO that can be asynchronously executed.

In fact, Python has async IO, too. In my particular case threads are
slightly more convenient (the libraries I'm using do not natively support
async), but that is really a minor detail. If we had a AsyncParDo for
Python, I could make that work very easily. In the worst case, I could use
a separate thread inside each async call.

In any case, see here for my implementation of a ThreadMap ptransform:
https://github.com/google/xarray-beam/blob/0.0.1/xarray_beam/_src/threadmap.py

Let me know if you think this might be of interest upstream in Beam. I
agree that in the long term this makes sense to be implemented in runners,
though I guess that might be more challenging to implement.


>> One alternative would be supporting this sort of concurrency control
>> inside Beam runners. In principle, I imagine runners could tune thread-pool
>> size for each stage automatically, e.g., based on CPU usage. To be honest,
>> I'm a little surprised this doesn't happen already, but I'm sure there are
>> good reasons why not.
>>
>
> Runner support would be the ideal solution. Because runners could decide
> on the most optimal pool size based on the real time information.
> Supporting and using annotations would provide helpful hints for the
> runners. At least the latter part is in progres IIRC.
>
>
>>
>> Let me know what you think!
>>
>> Cheers,
>> Stephan
>>
>>


Beam Summit 2021 - Call for Proposals

2021-05-11 Thread Mara Ruvalcaba

Hi Beam Community!

We are excited to announce Beam Summit 2021?:beam's-mascot-1::tada:  
Beam Summit will happen from August 4th - 6th, 2021, and it will be held 
online.



We want to hear from your experience with Beam!!! You are more than 
welcome to share with the community, send a proposal 
now:https://sessionize.com/beam-digital-summit-2021 



Attend the event, where you will be able to learn from the community and 
network with your peers, Register to the event 
now:https://2021.beamsummit.org/ 


Beam Summit organizers.

CFP Beam Summit



Flaky test issue report

2021-05-11 Thread Beam Jira Bot
This is your daily summary of Beam's current flaky tests. These are P1 issues 
because they have a major negative impact on the community and make it hard to 
determine the quality of the software.

BEAM-12322: FnApiRunnerTestWithGrpcAndMultiWorkers flaky (py precommit) 
(https://issues.apache.org/jira/browse/BEAM-12322)
BEAM-12311: Python PostCommit are close to timeout 
(https://issues.apache.org/jira/browse/BEAM-12311)
BEAM-12309: PubSubIntegrationTest.test_streaming_data_only flake 
(https://issues.apache.org/jira/browse/BEAM-12309)
BEAM-12307: PubSubBigQueryIT.test_file_loads flake 
(https://issues.apache.org/jira/browse/BEAM-12307)
BEAM-12303: Flake in PubSubIntegrationTest.test_streaming_with_attributes 
(https://issues.apache.org/jira/browse/BEAM-12303)
BEAM-12293: FlinkSavepointTest.testSavepointRestoreLegacy flakes due to 
FlinkJobNotFoundException (https://issues.apache.org/jira/browse/BEAM-12293)
BEAM-12291: 
org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: 
false] is flaky (https://issues.apache.org/jira/browse/BEAM-12291)
BEAM-12250: Java ValidatesRunner Postcommits timing out 
(https://issues.apache.org/jira/browse/BEAM-12250)
BEAM-12200: SamzaStoreStateInternalsTest is flaky 
(https://issues.apache.org/jira/browse/BEAM-12200)
BEAM-12163: Python GHA PreCommits flake with grpc.FutureTimeoutError on SDK 
harness startup (https://issues.apache.org/jira/browse/BEAM-12163)
BEAM-12061: beam_PostCommit_SQL failing on 
KafkaTableProviderIT.testFakeNested 
(https://issues.apache.org/jira/browse/BEAM-12061)
BEAM-12019: 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky (https://issues.apache.org/jira/browse/BEAM-12019)
BEAM-11792: Python precommit failed (flaked?) installing package  
(https://issues.apache.org/jira/browse/BEAM-11792)
BEAM-11666: 
apache_beam.runners.interactive.recording_manager_test.RecordingManagerTest.test_basic_execution
 is flaky (https://issues.apache.org/jira/browse/BEAM-11666)
BEAM-11662: elasticsearch tests failing 
(https://issues.apache.org/jira/browse/BEAM-11662)
BEAM-11661: hdfsIntegrationTest flake: network not found (py38 postcommit) 
(https://issues.apache.org/jira/browse/BEAM-11661)
BEAM-11646: beam_PostCommit_XVR_Spark failing 
(https://issues.apache.org/jira/browse/BEAM-11646)
BEAM-11645: beam_PostCommit_XVR_Flink failing 
(https://issues.apache.org/jira/browse/BEAM-11645)
BEAM-11541: testTeardownCalledAfterExceptionInProcessElement flakes on 
direct runner. (https://issues.apache.org/jira/browse/BEAM-11541)
BEAM-11540: Linter sometimes flakes on apache_beam.dataframe.frames_test 
(https://issues.apache.org/jira/browse/BEAM-11540)
BEAM-10995: Java + Universal Local Runner: 
WindowingTest.testWindowPreservation fails 
(https://issues.apache.org/jira/browse/BEAM-10995)
BEAM-10987: stager_test.py::StagerTest::test_with_main_session flaky on 
windows py3.6,3.7 (https://issues.apache.org/jira/browse/BEAM-10987)
BEAM-10968: flaky test: 
org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testAttemptedDistributionMetrics
 (https://issues.apache.org/jira/browse/BEAM-10968)
BEAM-10955: Flink Java Runner test flake: Could not find Flink job  
(https://issues.apache.org/jira/browse/BEAM-10955)
BEAM-10923: Python requirements installation in docker container is flaky 
(https://issues.apache.org/jira/browse/BEAM-10923)
BEAM-10899: test_FhirIO_exportFhirResourcesGcs flake with OOM 
(https://issues.apache.org/jira/browse/BEAM-10899)
BEAM-10866: PortableRunnerTestWithSubprocesses.test_register_finalizations 
flaky on macOS (https://issues.apache.org/jira/browse/BEAM-10866)
BEAM-10590: BigQueryQueryToTableIT flaky: test_big_query_new_types 
(https://issues.apache.org/jira/browse/BEAM-10590)
BEAM-10519: 
MultipleInputsAndOutputTests.testParDoWithSideInputsIsCumulative flaky on Samza 
(https://issues.apache.org/jira/browse/BEAM-10519)
BEAM-10504: Failure / flake in ElasticSearchIOTest > 
testWriteFullAddressing and testWriteWithIndexFn 
(https://issues.apache.org/jira/browse/BEAM-10504)
BEAM-10501: CheckGrafanaStalenessAlerts and PingGrafanaHttpApi fail with 
Connection refused (https://issues.apache.org/jira/browse/BEAM-10501)
BEAM-10485: Failure / flake: ElasticsearchIOTest > testWriteWithIndexFn 
(https://issues.apache.org/jira/browse/BEAM-10485)
BEAM-10272: Failure in CassandraIOTest init: cannot create cluster due to 
netty link error (https://issues.apache.org/jira/browse/BEAM-10272)
BEAM-9649: beam_python_mongoio_load_test started failing due to mismatched 
results (https://issues.apache.org/jira/browse/BEAM-9649)
BEAM-9392: TestStream tests are all flaky 
(https://issues.apache.org/jira/browse/BEAM-9392)
BEAM-9232: BigQueryWriteIntegrationTests is flaky coercing to Unicode 
(https://issues.apache.org/jira/browse/BEAM-9232)
BEAM-9119: 
apach

P1 issues report

2021-05-11 Thread Beam Jira Bot
This is your daily summary of Beam's current P1 issues, not including flaky 
tests.

See https://beam.apache.org/contribute/jira-priorities/#p1-critical for the 
meaning and expectations around P1 issues.

BEAM-12324: TranslationsTest.test_run_packable_combine_* failing on 
PostCommit_Py_VR_Dataflow (https://issues.apache.org/jira/browse/BEAM-12324)
BEAM-12321: Failure in test_run_packable_combine_per_key and 
test_run_packable_combine_globally 
(https://issues.apache.org/jira/browse/BEAM-12321)
BEAM-12320: PubsubTableProviderIT.testSQLSelectsArrayAttributes[0] failing 
in SQL PostCommit (https://issues.apache.org/jira/browse/BEAM-12320)
BEAM-12316: LGPL in bundled dependencies 
(https://issues.apache.org/jira/browse/BEAM-12316)
BEAM-12310: beam_PostCommit_Java_DataflowV2 failing 
(https://issues.apache.org/jira/browse/BEAM-12310)
BEAM-12308: CrossLanguageKafkaIOTest.test_kafkaio flake 
(https://issues.apache.org/jira/browse/BEAM-12308)
BEAM-12290: TestPubsub.assertThatSubscriptionEventuallyCreated timeout does 
not work (https://issues.apache.org/jira/browse/BEAM-12290)
BEAM-12287: beam_PerformanceTests_Kafka_IO failing due to 
:sdks:java:container:pullLicenses failure 
(https://issues.apache.org/jira/browse/BEAM-12287)
BEAM-12279: Implement destination-dependent sharding in FileIO.writeDynamic 
(https://issues.apache.org/jira/browse/BEAM-12279)
BEAM-12258: SQL postcommit timing out 
(https://issues.apache.org/jira/browse/BEAM-12258)
BEAM-12256: PubsubIO.readAvroGenericRecord creates SchemaCoder that fails 
to decode some Avro logical types 
(https://issues.apache.org/jira/browse/BEAM-12256)
BEAM-12231: beam_PostRelease_NightlySnapshot failing 
(https://issues.apache.org/jira/browse/BEAM-12231)
BEAM-1: Dataflow side input translation "Unknown producer for value" 
(https://issues.apache.org/jira/browse/BEAM-1)
BEAM-11959: Python Beam SDK Harness hangs when installing pip packages 
(https://issues.apache.org/jira/browse/BEAM-11959)
BEAM-11906: No trigger early repeatedly for session windows 
(https://issues.apache.org/jira/browse/BEAM-11906)
BEAM-11875: XmlIO.Read does not handle XML encoding per spec 
(https://issues.apache.org/jira/browse/BEAM-11875)
BEAM-11828: JmsIO is not acknowledging messages correctly 
(https://issues.apache.org/jira/browse/BEAM-11828)
BEAM-11755: Cross-language consistency (RequiresStableInputs) is quietly 
broken (at least on portable flink runner) 
(https://issues.apache.org/jira/browse/BEAM-11755)
BEAM-11578: `dataflow_metrics` (python) fails with TypeError (when int 
overflowing?) (https://issues.apache.org/jira/browse/BEAM-11578)
BEAM-11576: Go ValidatesRunner failure: TestFlattenDup on Dataflow Runner 
(https://issues.apache.org/jira/browse/BEAM-11576)
BEAM-11434: Expose Spanner admin/batch clients in Spanner Accessor 
(https://issues.apache.org/jira/browse/BEAM-11434)
BEAM-11227: Upgrade beam-vendor-grpc-1_26_0-0.3 to fix CVE-2020-27216 
(https://issues.apache.org/jira/browse/BEAM-11227)
BEAM-11148: Kafka commitOffsetsInFinalize OOM on Flink 
(https://issues.apache.org/jira/browse/BEAM-11148)
BEAM-11017: Timer with dataflow runner can be set multiple times (dataflow 
runner) (https://issues.apache.org/jira/browse/BEAM-11017)
BEAM-10861: Adds URNs and payloads to PubSub transforms 
(https://issues.apache.org/jira/browse/BEAM-10861)
BEAM-10670: Make non-portable Splittable DoFn the only option when 
executing Java "Read" transforms 
(https://issues.apache.org/jira/browse/BEAM-10670)
BEAM-10617: python CombineGlobally().with_fanout() cause duplicate combine 
results for sliding windows (https://issues.apache.org/jira/browse/BEAM-10617)
BEAM-10569: SpannerIO tests don't actually assert anything. 
(https://issues.apache.org/jira/browse/BEAM-10569)
BEAM-10288: Quickstart documents are out of date 
(https://issues.apache.org/jira/browse/BEAM-10288)
BEAM-10244: Populate requirements cache fails on poetry-based packages 
(https://issues.apache.org/jira/browse/BEAM-10244)
BEAM-10100: FileIO writeDynamic with AvroIO.sink not writing all data 
(https://issues.apache.org/jira/browse/BEAM-10100)
BEAM-9564: Remove insecure ssl options from MongoDBIO 
(https://issues.apache.org/jira/browse/BEAM-9564)
BEAM-9455: Environment-sensitive provisioning for Dataflow 
(https://issues.apache.org/jira/browse/BEAM-9455)
BEAM-9293: Python direct runner doesn't emit empty pane when it should 
(https://issues.apache.org/jira/browse/BEAM-9293)
BEAM-8986: SortValues may not work correct for numerical types 
(https://issues.apache.org/jira/browse/BEAM-8986)
BEAM-8985: SortValues should fail if SecondaryKey coder is not 
deterministic (https://issues.apache.org/jira/browse/BEAM-8985)
BEAM-8407: [SQL] Some Hive tests throw NullPointerException, but get marked 
as passing (Direct Runner) (https://issues.apache.org/jira/browse/BEAM-8407)
BEAM-7717: Pubsub

BeamSQL: Error when using WHERE statements with OVER windows

2021-05-11 Thread Burkay Gur
Hi folks,

When we try to run the following query on BeamSQL:

SELECT item, purchases, category, sum(purchases) over (PARTITION BY
category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW) as total_purchases  FROM PCOLLECTION WHERE purchases > 3

We are getting the following error:

Unable to convert query
org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to
convert query SELECT item, purchases, category, sum(purchases) over
(PARTITION BY category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW) as total_purchases  FROM PCOLLECTION WHERE purchases > 3
at
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:212)
at
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:111)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:171)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548) at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499) at
org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370) at

We traced the issue back to this PR:
https://github.com/apache/beam/pull/11975 specifically this line:
https://github.com/apache/beam/pull/11975/files#diff-919be1e4bcc11c17b725cbf04168b583886ffe16286f9291893247954128ad81R43

What are the plans on a wider support for analytical functions? If I want
to contribute, what are the best resources to learn more about how BeamSQL
/ Calcite integration is set up?

Best,
Burkay


Re: [EXT] Re: [EXT] Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-05-11 Thread Kenneth Knowles
+dev 

In the Beam Java ecosystem, this functionality is provided by the Sorter
library (https://beam.apache.org/documentation/sdks/java-extensions/#sorter).
I'm curious what people think about various options:

 - Python version of the transform(s)
 - Expose sorter as xlang transform(s)
 - Convenience transforms (that use pandas in DoFns?) to just do it for
small data per key to achieve compatibility
 - Beam model extension so that runners can do it as part of GBK

Kenn

On Mon, May 10, 2021 at 5:26 PM Wenbing Bai 
wrote:

> Hi Robert and Brian,
>
> I don't know why I didn't catch your replies. But thank you so much for
> looking at this.
>
> My parquet files will be consumed by downstreaming processes which require
> data points with the same "key1" that are sorted by "key2". The
> downstreaming process, for example, will make a rolling window with size N
> that reads N records together at one time. But note, the rolling window
> will not cross different "key1".
>
> So that is saying, 1) I don't need to sort the whole dataset. 2) all data
> with the same "key1" should be located together.
>
> I am not sure if I explain the use case clearly. Let me know what you
> think.
>
> Wenbing
>
>
> On Tue, Apr 20, 2021 at 5:01 PM Robert Bradshaw 
> wrote:
>
>> It would also be helpful to understand what your overall objective is
>> with this output. Is there a reason you need it sorted/partitioned in a
>> certain way?
>>
>> On Tue, Apr 20, 2021 at 4:51 PM Brian Hulette 
>> wrote:
>>
>>> Hi Wenbing,
>>> Sorry for taking so long to get back to you on this.
>>> I discussed this with Robert offline and we came up with a potential
>>> workaround - you could try writing out the Parquet file from within the
>>> groupby.apply method. You can use beam's FileSystems abstraction to open a
>>> Python file object referencing a cloud storage file, and pass that file
>>> object directly to the pandas to_parquet. It would look something like this:
>>>
>>>   df.groupby('key1').apply(lambda df:
>>> df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq"))
>>>
>>> If writing out sorted, partitioned parquet files is a common use-case we
>>> should think about making this easier though. At the very least
>>> partition_cols should work, I filed BEAM-12201 [1] for this. That alone
>>> won't be enough as our implementation will likely reshuffle the dataset to
>>> enforce the partitioning, removing any sorting that you've applied, so we'd
>>> also need to think about how to optimize the pipeline to avoid that shuffle.
>>>
>>> Brian
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-12201
>>>
>>> On Wed, Apr 7, 2021 at 9:02 PM Wenbing Bai 
>>> wrote:
>>>
 Thank you, Brian. I tried `partition_cols`, but it is not working. I
 tried pure pandas, it does work, so I am not sure if anything wrong with
 Beam.

 Wenbing

 On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette 
 wrote:

> Hm, to_parquet does have a `partition_cols` argument [1] which we pass
> through [2]. It would be interesting to see what  `partition_cols='key1'`
> does - I suspect it won't work perfectly though.
>
> Do you have any thoughts here Robert?
>
> [1]
> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
> [2]
> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525
>
> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai 
> wrote:
>
>> Hi Robert and Brian,
>>
>> I tried groupby in my case. Here is my pipeline code. I do see all
>> the data in the final parquet file are sorted in each group. However, I'd
>> like to write each partition (group) to an individual file, how can I
>> achieve it? In addition, I am using the master of Apache Beam SDK, how 
>> can
>> I test the pipeline with DataflowRunner considering there is no dataflow
>> worker image available?
>>
>> data = [
>> {
>> "key1": 1000 + i % 10,
>> "key2": randrange(1),
>> "feature_1": "somestring{}".format(i)
>> } for i in range(1)
>> ]
>>
>> class TestRow(typing.NamedTuple):
>> key1: int
>> key2: int
>> feature_1: str
>>
>> with beam.Pipeline() as p:
>> pcoll = (
>> p
>> | beam.Create(data)
>> | beam.Map(lambda x:x).with_output_types(TestRow)
>> )
>>
>> df = to_dataframe(pcoll)
>> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by=
>> 'key2')
>> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
>> (uuid.uuid4())[:8]), engine='pyarrow', index=False)
>>
>> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai <
>> wenbing@getcruise.com> wrote:
>>
>>> Thank you, Robert and Brian.
>>>
>>> I'd like to try this out. I am trying to distribute my dataset to
>>> nodes, sort each partition by some key and then store each 

Re: Some questions around GroupIntoBatches

2021-05-11 Thread Reuven Lax
On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles  wrote:

>
>
> On Mon, May 10, 2021 at 7:40 PM Reuven Lax  wrote:
>
>> Hi,
>>
>> I've been looking at the implementation of GroupIntoBatches (hoping to
>> add support to group based on byte size), and I have a few questions about
>> the current implementation.
>>
>> 1. I noticed that the transform does not preserve input timestamps. The
>> output timestamp is not explicitly set, so it will be whatever the default
>> output timestamp. Confusingly this will depend on which path is taken. If
>> the batch is output while processing an element, then the timestamp of tha
>> element will be used. If the batch is output from the timer, then the
>> processing-time value of the timer will be used.
>>
>
> Seems like bugs.
>
>
>>  - Should I start setting outputTimestamp explicitly - ideally to the
>> minimum timestamp in the current batch of elements?
>>
>
> That's a sensible default. Could pass a TimestampCombiner if that leads to
> watermark delays. Given the nature of the transform, the simple approach
> will probably be fine.
>

This is a bit tricky to do. in order to update the output timestamp, we
need to call setTimer again. However since we are calling
timer.offset().setRelative(), this will keep bumping the timer into the
future  and it will never fire.

One solution would be for GIB to just store the current timer ts in state,
and make sure that keep setting the same target until the timer fires.
However that precludes us from using setRelative (since that method does
not allow the DoFn to see what target time is being set). I think that this
approach won't play nice with TestStream.advanceProcessingTimeTo.

We could also add a way to update _just_ the output time for a timer
without resetting the entire timer.


>
>  - Should we have the option to preserve all element timestamps? this
>> could look something like this:
>>
>> PColllection>> batches =
>>
>> input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
>>
>
> This seems useful.
>
>
>> 2. flushBatch always resets the timer, even after the batch is processed.
>> The only reason I can think of for doing so is to update the watermark
>> hold. TimerInternals contains a deleteTimer method - is there any reason we
>> shouldn't simply implement Timer.clear and hook it up to
>> the existing deleteTimer?
>>
>
> Makes sense to me. Prior thread on this seemed to have lightweight
> consensus:
> https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E
>
>
>> 3. This transform was implemented before OnWindowExpiration was
>> implemented. I think we should add a window-expiration callback, and stop
>> setting the end-of window timer.
>>
>
> +1
>
> Kenn
>


Re: Upgrading vendored gRPC from 1.26.0 to 1.36.0

2021-05-11 Thread Tomo Suzuki
Thank you for the advice. Yes, the latch not being counted-down is the
problem. (my memo:
https://github.com/apache/beam/pull/14474#discussion_r619557479 ) I'll need
to figure out why withOnError is not called.


> Can you repro locally?

No, the task succeeds in my environment (./gradlew
:runners:google-cloud-dataflow-java:worker:test).


On Tue, May 11, 2021 at 12:34 PM Kenneth Knowles  wrote:

> I am not sure how much you read the code of the test. So apologies if I am
> saying things you already know. The test does something like:
>
>  - start a logging service
>  - set up some stub clients, each with onError wired up to release a
> countdown latch
>  - send error responses to all three of them (actually it sends the error
> in the same task it creates the stub)
>  - each task waits on the latch
>
> So if onError does not deliver or does not call to release the countdown
> latch, it will hang. I notice in the gist you provide that all three stub
> clients are hung awaiting the latch. That is suspicious to me. I would want
> to confirm if the flakiness always occurs in a way that hangs all three.
> Then there are gRPC workers waiting on empty queues, and the main test
> thread waiting for the hung tasks to complete.
>
> The problem could be something about the test set up. Personally I would
> add a ton of logs, or potentially use a debugger, to confirm exactly the
> state of things when it hangs. Can you repro locally? I think this same
> functionality could be tested in different ways that might remove some of
> the variables. For example starting up all the waiting tasks, then sending
> all the onError messages that should cause them to terminate.
>
> Since this is a unit test, adding a timeout to just that method should
> save time (but will make it harder to capture stack traces, etc). I've
> opened up https://github.com/apache/beam/pull/14781 for that. There may
> be a nice way to add a timeout to the executor to capture the hung stack,
> but I didn't look for it.
>
> Kenn
>
> On Tue, May 11, 2021 at 7:36 AM Tomo Suzuki  wrote:
>
>> gRPC 1.37.0 showed the same problem:
>> BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer
>> waits tasks forever, causing timeout in Java precommit.
>>
>> While I continue my investigation, I appreciate if someone knows the
>> cause of the problem, I pasted the thread dump of the Java process when the
>> test was frozen:
>> https://github.com/apache/beam/pull/14768
>>
>> If this mystery is never solved, vendoring (a bit old) gRPC 1.32.2
>> without the jboss dependencies is an alternate option, (suggestion by Kenn;
>> memo
>> 
>> )
>>
>> Regards,
>> Tomo
>>
>>
>> On Mon, May 10, 2021 at 9:40 AM Tomo Suzuki  wrote:
>>
>>> I was investigating the strange timeout (
>>> https://github.com/apache/beam/pull/14474) but was occupied with
>>> something else lately.
>>> Let me try the new version today to see any improvements.
>>>
>>>
>>> On Mon, May 10, 2021 at 4:57 AM Ismaël Mejía  wrote:
>>>
 I just saw that gRPC 1.37.1 is out now (and with aarch64 support for
 python!) that made me wonder about this, what is the current status of
 upgrading the vendored dependency Tomo?


 On Thu, Apr 8, 2021 at 4:16 PM Tomo Suzuki  wrote:

> We observed the cron job of Java Precommit for the master branch
> started timing out often (not always) since upgrading the gRPC version.
> https://github.com/apache/beam/pull/14466#issuecomment-815343974
>
> Exchanged messages with Kenn, I reverted to the change; now the master
> branch uses the vendored gRPC 1.26.
>
>
> On Wed, Mar 31, 2021 at 11:40 AM Kenneth Knowles 
> wrote:
>
>> Merged. Let's keep an eye for trouble, and I will incorporate to the
>> release branch.
>>
>> Kenn
>>
>> On Wed, Mar 31, 2021 at 6:45 AM Tomo Suzuki 
>> wrote:
>>
>>> Regarding troubleshooting on build timeout, it seems that Docker
>>> cache in Jenkins machines might be playing a role. As I run more "Java
>>> Presubmit", I no longer observe timeouts in the PR.
>>>
>>> Kenn, would you merge the PR?
>>> https://github.com/apache/beam/pull/14295 (all checks green,
>>> including the new Java postcommit checks)
>>>
>>> On Thu, Mar 25, 2021 at 5:24 PM Kenneth Knowles 
>>> wrote:
>>>
 Yes, I agree this might be a good idea. This is not the only major
 issue on the release-2.29.0 branch.

 The counter argument is that we will be pulling in all the bugs
 introduced to `master` since the branch cut.

 As far as effort goes, I have been mostly focused on burning down
 the bugs so I would not lose much work in the release process.

 Kenn

 On Thu, Mar 25,

Re: LGPL-2.1 in beam-vendor-grpc

2021-05-11 Thread Kenneth Knowles
+1

It seems we are pretty close on the upgrade. The same tricky problem as
before, but it seems to be narrowed down.

Kenn

On Mon, May 10, 2021 at 8:26 AM Jean-Baptiste Onofre 
wrote:

> +1 fully agree.
>
> Regards
> JB
>
> Le 10 mai 2021 à 16:02, Jan Lukavský  a écrit :
>
> +1 for blocking the release - I think we should not release something
> about which we _know_ that it might be legally problematic. And we should
> definitely create a check in the build process that will warn about such
> issues in the future.
>
>  Jan
> On 5/10/21 3:44 PM, Ismaël Mejía wrote:
>
> Tomo just confirmed in the ticket that if we update the gRPC vendored
> version we won't need the JBoss dependency anymore so we should be good to
> go with the upgrade. The open question is if this should be blocking for
> the upcoming Beam 2.31.0 release or we can fix it afterwards.
>
>
> On Mon, May 10, 2021 at 2:46 PM Ismaël Mejía  wrote:
>
>> We have been discussing about updating the vendored dependency in
>> BEAM-11227 , if I
>> remember correctly the newer version of gRPC does not require the jboss
>> dependency, so probably is the best upgrade path, can you confirm Tomo
>> Suzuki
>>  ?
>>
>> On Mon, May 10, 2021 at 2:33 PM Jarek Potiuk  wrote:
>>
>>> Also we have very similar discussion about it in
>>> https://issues.apache.org/jira/browse/LEGAL-572
>>> Just to be clear about the context of it, it's not a legal requirement
>>> of Apache Licence, it's Apache Software Foundation policy, that we should
>>> not limit our users in using our software. If the LGPL dependency is
>>> "optional", it's fine to add such optional dependency. If it is "required"
>>> to run your software, then it is not allowed as it limits the users of ASF
>>> software in further redistributing the software in the way they want (this
>>> is at least my understanding of it).
>>>
>>> On Mon, May 10, 2021 at 12:58 PM JB Onofré  wrote:
>>>
 Hi

 You can take a look on

 https://www.apache.org/legal/resolved.html

 Regards
 JB

 Le 10 mai 2021 à 12:56, Elliotte Rusty Harold  a
 écrit :

 Anyone have a link to the official Apache policy about this? Thanks.

 On Mon, May 10, 2021 at 10:07 AM Jan Lukavský  wrote:


 Hi,


 we are bundling dependencies with LGPL-2.1, according to license header

 in META-INF/maven/org.jboss.modules/jboss-modules/pom.xml. I think is

 might be an issue, already reported here: [1]. I created [2] to track it

 on our side.


  Jan


 [1] https://issues.apache.org/jira/browse/FLINK-22555


 [2] https://issues.apache.org/jira/browse/BEAM-12316




 --
 Elliotte Rusty Harold
 elh...@ibiblio.org


>>>
>>> --
>>> +48 660 796 129 <+48%20660%20796%20129>
>>>
>>
>


Re: Upgrading vendored gRPC from 1.26.0 to 1.36.0

2021-05-11 Thread Kenneth Knowles
I am not sure how much you read the code of the test. So apologies if I am
saying things you already know. The test does something like:

 - start a logging service
 - set up some stub clients, each with onError wired up to release a
countdown latch
 - send error responses to all three of them (actually it sends the error
in the same task it creates the stub)
 - each task waits on the latch

So if onError does not deliver or does not call to release the countdown
latch, it will hang. I notice in the gist you provide that all three stub
clients are hung awaiting the latch. That is suspicious to me. I would want
to confirm if the flakiness always occurs in a way that hangs all three.
Then there are gRPC workers waiting on empty queues, and the main test
thread waiting for the hung tasks to complete.

The problem could be something about the test set up. Personally I would
add a ton of logs, or potentially use a debugger, to confirm exactly the
state of things when it hangs. Can you repro locally? I think this same
functionality could be tested in different ways that might remove some of
the variables. For example starting up all the waiting tasks, then sending
all the onError messages that should cause them to terminate.

Since this is a unit test, adding a timeout to just that method should save
time (but will make it harder to capture stack traces, etc). I've opened up
https://github.com/apache/beam/pull/14781 for that. There may be a nice way
to add a timeout to the executor to capture the hung stack, but I didn't
look for it.

Kenn

On Tue, May 11, 2021 at 7:36 AM Tomo Suzuki  wrote:

> gRPC 1.37.0 showed the same problem:
> BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer
> waits tasks forever, causing timeout in Java precommit.
>
> While I continue my investigation, I appreciate if someone knows the cause
> of the problem, I pasted the thread dump of the Java process when the test
> was frozen:
> https://github.com/apache/beam/pull/14768
>
> If this mystery is never solved, vendoring (a bit old) gRPC 1.32.2 without
> the jboss dependencies is an alternate option, (suggestion by Kenn; memo
> 
> )
>
> Regards,
> Tomo
>
>
> On Mon, May 10, 2021 at 9:40 AM Tomo Suzuki  wrote:
>
>> I was investigating the strange timeout (
>> https://github.com/apache/beam/pull/14474) but was occupied with
>> something else lately.
>> Let me try the new version today to see any improvements.
>>
>>
>> On Mon, May 10, 2021 at 4:57 AM Ismaël Mejía  wrote:
>>
>>> I just saw that gRPC 1.37.1 is out now (and with aarch64 support for
>>> python!) that made me wonder about this, what is the current status of
>>> upgrading the vendored dependency Tomo?
>>>
>>>
>>> On Thu, Apr 8, 2021 at 4:16 PM Tomo Suzuki  wrote:
>>>
 We observed the cron job of Java Precommit for the master branch
 started timing out often (not always) since upgrading the gRPC version.
 https://github.com/apache/beam/pull/14466#issuecomment-815343974

 Exchanged messages with Kenn, I reverted to the change; now the master
 branch uses the vendored gRPC 1.26.


 On Wed, Mar 31, 2021 at 11:40 AM Kenneth Knowles 
 wrote:

> Merged. Let's keep an eye for trouble, and I will incorporate to the
> release branch.
>
> Kenn
>
> On Wed, Mar 31, 2021 at 6:45 AM Tomo Suzuki 
> wrote:
>
>> Regarding troubleshooting on build timeout, it seems that Docker
>> cache in Jenkins machines might be playing a role. As I run more "Java
>> Presubmit", I no longer observe timeouts in the PR.
>>
>> Kenn, would you merge the PR?
>> https://github.com/apache/beam/pull/14295 (all checks green,
>> including the new Java postcommit checks)
>>
>> On Thu, Mar 25, 2021 at 5:24 PM Kenneth Knowles 
>> wrote:
>>
>>> Yes, I agree this might be a good idea. This is not the only major
>>> issue on the release-2.29.0 branch.
>>>
>>> The counter argument is that we will be pulling in all the bugs
>>> introduced to `master` since the branch cut.
>>>
>>> As far as effort goes, I have been mostly focused on burning down
>>> the bugs so I would not lose much work in the release process.
>>>
>>> Kenn
>>>
>>> On Thu, Mar 25, 2021 at 1:42 PM Ismaël Mejía 
>>> wrote:
>>>
 Precommit is quite unstable in the last days, so worth to check if
 something is wrong in the CI.

 I have a question Kenn. Given that cherry picking this might be a
 bit
 big as a change can we just reconsider cutting the 2.29.0 branch
 again
 after the updated gRPC version use gets merged and mark the issues
 already fixed for version 2.30.0 to version 2.29.0 ? Seems like an
 easier upgrade path (and 

Re: Ordered PCollections eventually?

2021-05-11 Thread Jan Lukavský
I'll just remind that Beam already supports (experimental) 
@RequiresTimeSortedInput (which has several limitations, mostly in that 
it orders only by timestamp and not some - time related - user field; 
and of course - missing retractions). An arbitrary sorting seems to be 
hard, even per-key, it seems it will always have to be somewhat 
time-bounded, as otherwise it might require unbounded states. The batch 
case on the other hand typically has a way to order inputs arbitrarily 
with virtually zero cost, as many implementations use sort-merge-group 
to perform reduction operations.


 Jan

On 5/11/21 5:56 PM, Kenneth Knowles wrote:
Per-key ordered delivery makes a ton of sense. I'd guess CDC has the 
same needs as retractions, so that the changelog can be applied in 
order as it arrives. And since it is per-key you still get parallelism.


Global ordering is quite different. I know that SQL and Dataframes 
have global sorting operations. The question has always been how does 
"embarassingly paralllel" processing interact with sorting/ordering. I 
imagine some other systems have the features so we can look at how it 
is used?


Kenn

Kenn

On Mon, May 10, 2021 at 4:39 PM Sam Rohde > wrote:


Awesome, thanks Pablo!

On Mon, May 10, 2021 at 4:05 PM Pablo Estrada mailto:pabl...@google.com>> wrote:

CDC would also benefit. I am working on a proposal for this
that is concerned with streaming pipelines, and per-key
ordered delivery. I will share with you as soon as I have a
draft.
Best
-P.

On Mon, May 10, 2021 at 2:56 PM Reuven Lax mailto:re...@google.com>> wrote:

There has been talk, but nothing concrete.

On Mon, May 10, 2021 at 1:42 PM Sam Rohde
mailto:sro...@google.com>> wrote:

Hi All,

I was wondering if there had been any plans for
creating ordered PCollections in the Beam model? Or if
there might be plans for them in the future?

I know that Beam SQL and Beam DataFrames would
directly benefit from an ordered PCollection.

Regards,
Sam



Re: Some questions around GroupIntoBatches

2021-05-11 Thread Kenneth Knowles
On Mon, May 10, 2021 at 7:40 PM Reuven Lax  wrote:

> Hi,
>
> I've been looking at the implementation of GroupIntoBatches (hoping to add
> support to group based on byte size), and I have a few questions about the
> current implementation.
>
> 1. I noticed that the transform does not preserve input timestamps. The
> output timestamp is not explicitly set, so it will be whatever the default
> output timestamp. Confusingly this will depend on which path is taken. If
> the batch is output while processing an element, then the timestamp of tha
> element will be used. If the batch is output from the timer, then the
> processing-time value of the timer will be used.
>

Seems like bugs.


>  - Should I start setting outputTimestamp explicitly - ideally to the
> minimum timestamp in the current batch of elements?
>

That's a sensible default. Could pass a TimestampCombiner if that leads to
watermark delays. Given the nature of the transform, the simple approach
will probably be fine.

 - Should we have the option to preserve all element timestamps? this
> could look something like this:
>
> PColllection>> batches =
>
> input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
>

This seems useful.


> 2. flushBatch always resets the timer, even after the batch is processed.
> The only reason I can think of for doing so is to update the watermark
> hold. TimerInternals contains a deleteTimer method - is there any reason we
> shouldn't simply implement Timer.clear and hook it up to
> the existing deleteTimer?
>

Makes sense to me. Prior thread on this seemed to have lightweight
consensus:
https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E


> 3. This transform was implemented before OnWindowExpiration was
> implemented. I think we should add a window-expiration callback, and stop
> setting the end-of window timer.
>

+1

Kenn


Re: Ordered PCollections eventually?

2021-05-11 Thread Kenneth Knowles
Per-key ordered delivery makes a ton of sense. I'd guess CDC has the same
needs as retractions, so that the changelog can be applied in order as it
arrives. And since it is per-key you still get parallelism.

Global ordering is quite different. I know that SQL and Dataframes have
global sorting operations. The question has always been how does
"embarassingly paralllel" processing interact with sorting/ordering. I
imagine some other systems have the features so we can look at how it is
used?

Kenn

Kenn

On Mon, May 10, 2021 at 4:39 PM Sam Rohde  wrote:

> Awesome, thanks Pablo!
>
> On Mon, May 10, 2021 at 4:05 PM Pablo Estrada  wrote:
>
>> CDC would also benefit. I am working on a proposal for this that is
>> concerned with streaming pipelines, and per-key ordered delivery. I will
>> share with you as soon as I have a draft.
>> Best
>> -P.
>>
>> On Mon, May 10, 2021 at 2:56 PM Reuven Lax  wrote:
>>
>>> There has been talk, but nothing concrete.
>>>
>>> On Mon, May 10, 2021 at 1:42 PM Sam Rohde  wrote:
>>>
 Hi All,

 I was wondering if there had been any plans for creating ordered
 PCollections in the Beam model? Or if there might be plans for them in the
 future?

 I know that Beam SQL and Beam DataFrames would directly benefit from an
 ordered PCollection.

 Regards,
 Sam

>>>