Apply a Beam PTransform per key

2021-05-21 Thread Stephan Hoyer
I'd like to write a Beam PTransform that applies an *existing* Beam
transform to each set of grouped values, separately, and combines the
result. Is anything like this possible with Beam using the Python SDK?

Here are the closest things I've come up with:
1. If each set of *inputs* to my transform fit into memory, I could use
GroupByKey followed by FlatMap.
2. If each set of *outputs* from my transform fit into memory, I could use
CombinePerKey.
3. If I knew the static number of groups ahead of time, I could use
Partition, followed by applying my transform multiple times, followed by
Flatten.

In my scenario, none of these holds true. For example, currently I have ~20
groups of values, with each group holding ~1 TB of data. My custom
transform simply shuffles this TB of data around, so each set of outputs is
also 1TB in size.

In my particular case, it seems my options are to either relax these
constraints, or to manually convert each step of my existing transform to
apply per key. This conversion process is tedious, but very
straightforward, e.g., the GroupByKey and ParDo that my transform is built
out of just need to deal with an expanded key.

I wonder, could this be something built into Beam itself, e.g,. as
TransformPerKey? The ptranforms that result from combining other Beam
transforms (e.g., _ChainPTransform in Python) are private, so this seems
like something that would need to exist in Beam itself, if it could exist
at all.

Cheers,
Stephan


Re: Proposal: Generalize S3FileSystem

2021-05-21 Thread Kenneth Knowles
Please follow URL intention if at all possible. Specifically the bits
before the : should indicate how to parse the rest of the URL, not other
information. Is this convention of sticking the host before the : already
an established thing for s3-compatible endpoints?

If the various S3-compatible providers have their own schemes, is it
possible to just register the same code with different config for those
schemes and not invent any new URLs? That would be ideal.

Kenn

On Thu, May 20, 2021 at 2:30 PM Charles Chen  wrote:

> Is it feasible to keep the endpoint information in the path?  It seems
> pretty desirable to keep URIs "universal" so that it's possible to
> understand what is being pointed to without explicit service configuration,
> so maybe you can have a scheme like "s3+endpoint=api.example.com
> ://my/bucket/path"?
>
> On Thu, May 20, 2021 at 12:31 PM Kenneth Knowles  wrote:
>
>> $.02
>>
>> Most important is community to maintain it. It cannot be a separate
>> project or subproject (lots of ASF projects have this, so they share
>> governance) without that.
>>
>> To add additional friction of separate release and dependency in build
>> before you have community, it should be extremely stable so you upgrade
>> rarely. See the process of upgrading our vendored deps. It is considerable.
>>
>> Kenn
>>
>> On Thu, May 20, 2021 at 12:07 PM Stephan Hoyer  wrote:
>>
>>> On Thu, May 20, 2021 at 10:12 AM Chad Dombrova 
>>> wrote:
>>>
 Hi Brian,
 I think the main goal would be to make a python package that could be
 pip installed independently of apache_beam.  That goal could be
 accomplished with option 3, thus preserving all of the benefits of a
 monorepo. If it gains enough popularity and contributors outside of the
 Beam community, then options 1 and 2 could be considered to make it easier
 to foster a new community of contributors.

>>>
>>> This sounds like a lovely goal!
>>>
>>> I'll just mention the "fsspec" Python project, which came out of Dask:
>>> https://filesystem-spec.readthedocs.io/en/latest/
>>>
>>> As far as I can tell, it serves basically this exact same purpose
>>> (generic filesystems with high-performance IO), and has started to get some
>>> traction in other projects, e.g., it's now used in pandas. I don't know if
>>> it would be suitable for Beam, but it might be worth a try.
>>>
>>> Cheers,
>>> Stephan
>>>
>>>
 Beam has a lot of great tech in it, and it makes me think of Celery,
 which is a much older python project of a similar ilk that spawned a series
 of useful independent projects: kombu [1], an AMQP messaging library, and
 billiard [2], a multiprocessing library.

 Obviously, there are a number of pros and cons to consider.  The cons
 are pretty clear: even within a monorepo it will make the Beam build more
 complicated.  The pros are a bit more abstract.  The fileIO project could
 appeal to a broader audience, and act as a signpost for Beam (on PyPI,
 etc), thereby increasing awareness of Beam amongst the types of
 cloud-friendly python developers who would need the fileIO package.

 -chad

 [1] https://github.com/celery/kombu
 [2] https://github.com/celery/billiard




 On Thu, May 20, 2021 at 7:57 AM Brian Hulette 
 wrote:

> That's an interesting idea. What do you mean by its own project? A
> couple of possibilities:
> - Spinning off a new ASF project
> - A separate Beam-governed repository (e.g. apache/beam-filesystems)
> - More clearly separate it in the current build system and release
> artifacts that allow it to be used independently
>
> Personally I'd be resistant to the first two (I am a Google engineer
> and I like monorepos after all), but I don't see a major problem with the
> last one, except that it gives us another surface to maintain.
>
> Brian
>
> On Wed, May 19, 2021 at 8:38 PM Chad Dombrova 
> wrote:
>
>> This is a random idea, but the whole file IO system inside Beam would
>> actually be awesome to extract into its own project.  IIRC, it’s not
>> particularly tied to Beam.
>>
>> I’m not saying this should be done now, but it’s be nice to keep it
>> mind for a future goal.
>>
>> -chad
>>
>>
>>
>> On Wed, May 19, 2021 at 10:23 AM Pablo Estrada 
>> wrote:
>>
>>> That would be great to add, Matt. Of course it's important to make
>>> this backwards compatible, but other than that, the addition would be 
>>> very
>>> welcome.
>>>
>>> On Wed, May 19, 2021 at 9:41 AM Matt Rudary <
>>> matt.rud...@twosigma.com> wrote:
>>>
 Hi,



 This is a quick sketch of a proposal – I wanted to get a sense of
 whether there’s general support for this idea before fleshing it out
 further, getting internal approvals, etc.




Flaky test issue report

2021-05-21 Thread Beam Jira Bot
This is your daily summary of Beam's current flaky tests. These are P1 issues 
because they have a major negative impact on the community and make it hard to 
determine the quality of the software.

BEAM-12322: FnApiRunnerTestWithGrpcAndMultiWorkers flaky (py precommit) 
(https://issues.apache.org/jira/browse/BEAM-12322)
BEAM-12309: PubSubIntegrationTest.test_streaming_data_only flake 
(https://issues.apache.org/jira/browse/BEAM-12309)
BEAM-12307: PubSubBigQueryIT.test_file_loads flake 
(https://issues.apache.org/jira/browse/BEAM-12307)
BEAM-12303: Flake in PubSubIntegrationTest.test_streaming_with_attributes 
(https://issues.apache.org/jira/browse/BEAM-12303)
BEAM-12293: FlinkSavepointTest.testSavepointRestoreLegacy flakes due to 
FlinkJobNotFoundException (https://issues.apache.org/jira/browse/BEAM-12293)
BEAM-12291: 
org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: 
false] is flaky (https://issues.apache.org/jira/browse/BEAM-12291)
BEAM-12200: SamzaStoreStateInternalsTest is flaky 
(https://issues.apache.org/jira/browse/BEAM-12200)
BEAM-12163: Python GHA PreCommits flake with grpc.FutureTimeoutError on SDK 
harness startup (https://issues.apache.org/jira/browse/BEAM-12163)
BEAM-12061: beam_PostCommit_SQL failing on 
KafkaTableProviderIT.testFakeNested 
(https://issues.apache.org/jira/browse/BEAM-12061)
BEAM-12019: 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky (https://issues.apache.org/jira/browse/BEAM-12019)
BEAM-11792: Python precommit failed (flaked?) installing package  
(https://issues.apache.org/jira/browse/BEAM-11792)
BEAM-11666: 
apache_beam.runners.interactive.recording_manager_test.RecordingManagerTest.test_basic_execution
 is flaky (https://issues.apache.org/jira/browse/BEAM-11666)
BEAM-11662: elasticsearch tests failing 
(https://issues.apache.org/jira/browse/BEAM-11662)
BEAM-11661: hdfsIntegrationTest flake: network not found (py38 postcommit) 
(https://issues.apache.org/jira/browse/BEAM-11661)
BEAM-11645: beam_PostCommit_XVR_Flink failing 
(https://issues.apache.org/jira/browse/BEAM-11645)
BEAM-11541: testTeardownCalledAfterExceptionInProcessElement flakes on 
direct runner. (https://issues.apache.org/jira/browse/BEAM-11541)
BEAM-11540: Linter sometimes flakes on apache_beam.dataframe.frames_test 
(https://issues.apache.org/jira/browse/BEAM-11540)
BEAM-10995: Java + Universal Local Runner: 
WindowingTest.testWindowPreservation fails 
(https://issues.apache.org/jira/browse/BEAM-10995)
BEAM-10987: stager_test.py::StagerTest::test_with_main_session flaky on 
windows py3.6,3.7 (https://issues.apache.org/jira/browse/BEAM-10987)
BEAM-10968: flaky test: 
org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testAttemptedDistributionMetrics
 (https://issues.apache.org/jira/browse/BEAM-10968)
BEAM-10955: Flink Java Runner test flake: Could not find Flink job  
(https://issues.apache.org/jira/browse/BEAM-10955)
BEAM-10866: PortableRunnerTestWithSubprocesses.test_register_finalizations 
flaky on macOS (https://issues.apache.org/jira/browse/BEAM-10866)
BEAM-10504: Failure / flake in ElasticSearchIOTest > 
testWriteFullAddressing and testWriteWithIndexFn 
(https://issues.apache.org/jira/browse/BEAM-10504)
BEAM-10501: CheckGrafanaStalenessAlerts and PingGrafanaHttpApi fail with 
Connection refused (https://issues.apache.org/jira/browse/BEAM-10501)
BEAM-10485: Failure / flake: ElasticsearchIOTest > testWriteWithIndexFn 
(https://issues.apache.org/jira/browse/BEAM-10485)
BEAM-9649: beam_python_mongoio_load_test started failing due to mismatched 
results (https://issues.apache.org/jira/browse/BEAM-9649)
BEAM-9392: TestStream tests are all flaky 
(https://issues.apache.org/jira/browse/BEAM-9392)
BEAM-9232: BigQueryWriteIntegrationTests is flaky coercing to Unicode 
(https://issues.apache.org/jira/browse/BEAM-9232)
BEAM-9119: 
apache_beam.runners.portability.fn_api_runner_test.FnApiRunnerTest[...].test_large_elements
 is flaky (https://issues.apache.org/jira/browse/BEAM-9119)
BEAM-8101: Flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful for 
Direct, Spark, Flink (https://issues.apache.org/jira/browse/BEAM-8101)
BEAM-8035: [beam_PreCommit_Java_Phrase] 
[WatchTest.testMultiplePollsWithManyResults]  Flake: Outputs must be in 
timestamp order (https://issues.apache.org/jira/browse/BEAM-8035)
BEAM-7992: Unhandled type_constraint in 
apache_beam.io.gcp.bigquery_write_it_test.BigQueryWriteIntegrationTests.test_big_query_write_new_types
 (https://issues.apache.org/jira/browse/BEAM-7992)
BEAM-7827: MetricsTest$AttemptedMetricTests.testAllAttemptedMetrics is 
flaky on DirectRunner (https://issues.apache.org/jira/browse/BEAM-7827)
BEAM-7752: Java Validates DirectRunner: 
testTeardownCalledAfterExceptionInFinishBundleStateful flaky 

P1 issues report

2021-05-21 Thread Beam Jira Bot
This is your daily summary of Beam's current P1 issues, not including flaky 
tests.

See https://beam.apache.org/contribute/jira-priorities/#p1-critical for the 
meaning and expectations around P1 issues.

BEAM-12389: beam_PostCommit_XVR_Dataflow flaky: Expand method not found 
(https://issues.apache.org/jira/browse/BEAM-12389)
BEAM-12387: beam_PostCommit_Python* timing out 
(https://issues.apache.org/jira/browse/BEAM-12387)
BEAM-12386: beam_PostCommit_Py_VR_Dataflow(_V2) failing metrics tests 
(https://issues.apache.org/jira/browse/BEAM-12386)
BEAM-12380: Go SDK Kafka IO Transform implemented via XLang 
(https://issues.apache.org/jira/browse/BEAM-12380)
BEAM-12374: Spark postcommit failing ResumeFromCheckpointStreamingTest 
(https://issues.apache.org/jira/browse/BEAM-12374)
BEAM-12316: LGPL in bundled dependencies 
(https://issues.apache.org/jira/browse/BEAM-12316)
BEAM-12310: beam_PostCommit_Java_DataflowV2 failing 
(https://issues.apache.org/jira/browse/BEAM-12310)
BEAM-12279: Implement destination-dependent sharding in FileIO.writeDynamic 
(https://issues.apache.org/jira/browse/BEAM-12279)
BEAM-12258: SQL postcommit timing out 
(https://issues.apache.org/jira/browse/BEAM-12258)
BEAM-12256: PubsubIO.readAvroGenericRecord creates SchemaCoder that fails 
to decode some Avro logical types 
(https://issues.apache.org/jira/browse/BEAM-12256)
BEAM-12231: beam_PostRelease_NightlySnapshot failing 
(https://issues.apache.org/jira/browse/BEAM-12231)
BEAM-11959: Python Beam SDK Harness hangs when installing pip packages 
(https://issues.apache.org/jira/browse/BEAM-11959)
BEAM-11906: No trigger early repeatedly for session windows 
(https://issues.apache.org/jira/browse/BEAM-11906)
BEAM-11875: XmlIO.Read does not handle XML encoding per spec 
(https://issues.apache.org/jira/browse/BEAM-11875)
BEAM-11828: JmsIO is not acknowledging messages correctly 
(https://issues.apache.org/jira/browse/BEAM-11828)
BEAM-11755: Cross-language consistency (RequiresStableInputs) is quietly 
broken (at least on portable flink runner) 
(https://issues.apache.org/jira/browse/BEAM-11755)
BEAM-11578: `dataflow_metrics` (python) fails with TypeError (when int 
overflowing?) (https://issues.apache.org/jira/browse/BEAM-11578)
BEAM-11576: Go ValidatesRunner failure: TestFlattenDup on Dataflow Runner 
(https://issues.apache.org/jira/browse/BEAM-11576)
BEAM-11434: Expose Spanner admin/batch clients in Spanner Accessor 
(https://issues.apache.org/jira/browse/BEAM-11434)
BEAM-11227: Upgrade beam-vendor-grpc-1_26_0-0.3 to fix CVE-2020-27216 
(https://issues.apache.org/jira/browse/BEAM-11227)
BEAM-11148: Kafka commitOffsetsInFinalize OOM on Flink 
(https://issues.apache.org/jira/browse/BEAM-11148)
BEAM-11017: Timer with dataflow runner can be set multiple times (dataflow 
runner) (https://issues.apache.org/jira/browse/BEAM-11017)
BEAM-10670: Make non-portable Splittable DoFn the only option when 
executing Java "Read" transforms 
(https://issues.apache.org/jira/browse/BEAM-10670)
BEAM-10617: python CombineGlobally().with_fanout() cause duplicate combine 
results for sliding windows (https://issues.apache.org/jira/browse/BEAM-10617)
BEAM-10569: SpannerIO tests don't actually assert anything. 
(https://issues.apache.org/jira/browse/BEAM-10569)
BEAM-10288: Quickstart documents are out of date 
(https://issues.apache.org/jira/browse/BEAM-10288)
BEAM-10244: Populate requirements cache fails on poetry-based packages 
(https://issues.apache.org/jira/browse/BEAM-10244)
BEAM-10100: FileIO writeDynamic with AvroIO.sink not writing all data 
(https://issues.apache.org/jira/browse/BEAM-10100)
BEAM-9564: Remove insecure ssl options from MongoDBIO 
(https://issues.apache.org/jira/browse/BEAM-9564)
BEAM-9455: Environment-sensitive provisioning for Dataflow 
(https://issues.apache.org/jira/browse/BEAM-9455)
BEAM-9293: Python direct runner doesn't emit empty pane when it should 
(https://issues.apache.org/jira/browse/BEAM-9293)
BEAM-8986: SortValues may not work correct for numerical types 
(https://issues.apache.org/jira/browse/BEAM-8986)
BEAM-8985: SortValues should fail if SecondaryKey coder is not 
deterministic (https://issues.apache.org/jira/browse/BEAM-8985)
BEAM-8407: [SQL] Some Hive tests throw NullPointerException, but get marked 
as passing (Direct Runner) (https://issues.apache.org/jira/browse/BEAM-8407)
BEAM-7717: PubsubIO watermark tracking hovers near start of epoch 
(https://issues.apache.org/jira/browse/BEAM-7717)
BEAM-7716: PubsubIO returns empty message bodies for all messages read 
(https://issues.apache.org/jira/browse/BEAM-7716)
BEAM-7195: BigQuery - 404 errors for 'table not found' when using dynamic 
destinations - sometimes, new table fails to get created 
(https://issues.apache.org/jira/browse/BEAM-7195)
BEAM-6839: User reports protobuf ClassChangeError running against 2.6.0 or 
above 

One Pager - Test Command Line Discoverability in Beam

2021-05-21 Thread Alex Amato
Hi, I have had some issues determining how to run Beam tests. I have
written a one pager for review and would like your feedback, to solve the
problem

:

"A Beam developer is looking at a test file, such as
“BigQueryTornadoesIT.java” and wants to run this test. But they do not know
the command line they need to type to run this test."

I would like your feedback, to get toward a more concrete proposal. A few
solutions are possible for this, mentioned in the proposal. But any
solution that makes it very easy to understand how to run the test is a
viable option as well.

Cheers,
Alex


Re: Flake trends - better?

2021-05-21 Thread Tyson Hamilton
There is a cleanup in progress right now by the folks at Wizeline for the
following query [1]. They're going to check and see which bugs are obsolete
or still a problem and fix some of them as well.

[1]:
project="BEAM" AND status="OPEN"  AND labels in (flake, currently-failing)

Link:
https://issues.apache.org/jira/issues/?jql=project%3DBEAM%20AND%20status%3DOPEN%20%20AND%20labels%20in%20(flake%2C%20currently-failing)%20AND%20createdDate%20%3C%20startOfMonth()



On Fri, May 21, 2021 at 8:40 AM Kenneth Knowles  wrote:

> Yes, I think that a one-time focused push to triage old "flake" tags might
> have a long-term impact. We currently have 35 open "flake" labeled bugs,
> with average age of about 350 days, with a significant number of very old
> bugs that are almost certainly obsolete.
>
> That does not mean all is well, because I think often the solution is to
> disable the flake to restore signal, so then we need to watch the ignored
> tests. I have used the "sickbay" tag on Jira but we could choose a more
> self-explanatory one. Also it would be better to pull the info from the
> code directly.
>
> Kenn
>
> On Mon, May 10, 2021 at 12:32 PM Ahmet Altay  wrote:
>
>> Any suggestions on how to clean this up? We can organize a cleanup to
>> reduce the numbers a bit. Ideally we need to find a way to prevent the
>> future growth but a temporary reduction might make it easier for us to keep
>> reviewing and closing new issues.
>>
>> On Mon, May 10, 2021 at 9:11 AM Brian Hulette 
>> wrote:
>>
>>> In addition to stale flake jiras, I think there are also many tracking
>>> tests that were disabled years ago due to flakiness.
>>>
>>> On Sat, May 8, 2021 at 1:39 PM Kenneth Knowles  wrote:
>>>
 Oh the second chart is not automatically associated with the
 board/filter. Here is the correct link:
 https://issues.apache.org/jira/secure/ConfigureReport.jspa?projectOrFilterId=filter-12350547=daily=300=12319527=com.atlassian.jira.jira-core-reports-plugin%3Aaverageage-report_token=A5KQ-2QAV-T4JA-FDED_ea6ac783c727523cf6bfed04ba94ce91bb62da91_lin=Next

 On Sat, May 8, 2021 at 1:37 PM Kenneth Knowles  wrote:

> The second chart is clearly bad and getting worse. Our flake bugs are
> not getting addressed in a timely manner.
>
> Zooming in on the first chart for the last 3 months you can see a
> notable change:
> https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=464=BEAM=reporting=cumulativeFlowDiagram=1174=1175=2038=2039=2040=90.
> This will not change the average in the second chart very quickly.
>
> It may be just cleanup. That seems likely. Anecdotally, I have done a
> lot of triage recently of failures and I know of only two severe flakes
> (that you can count on seeing in a day/week). If so, then more cleanup
> would be valuable. This is why I ran the second report: I suspected that 
> we
> had a lot of very old stale flake bugs that noone is looking at.
>
> Kenn
>
> On Fri, May 7, 2021 at 4:37 PM Ahmet Altay  wrote:
>
>> Thank you for sharing the charts.
>>
>> I know you are the messenger here, but I disagree with the message
>> that flakes are getting noticeably better. Number of open issues look 
>> quite
>> large but at least stable. I will guess that some of those are stale and
>> seemingly we did a clean up in July 2020. We can try that again. Second
>> chart shows a bad picture IMO. Issues staying open for 500-600 days on
>> average sounds like really long.
>>
>> On Fri, May 7, 2021 at 1:42 PM Kenneth Knowles 
>> wrote:
>>
>>> Alright, I think it should be fixed. The underlying saved filter had
>>> not been shared.
>>>
>>> Kenn
>>>
>>> On Fri, May 7, 2021 at 8:02 AM Brian Hulette 
>>> wrote:
>>>
 The first link doesn't work for me, I just see a blank page with
 some jira header and navbar. Do I need some additional permissions?

 If I click over to "Kanban Board" on the toggle at the top right I
 see a card with "Error: The requested board cannot be viewed because it
 either does not exist or you do not have permission to view it."

 Brian

 On Thu, May 6, 2021 at 5:56 PM Kenneth Knowles 
 wrote:

> I spoke too soon?
>
>
> https://issues.apache.org/jira/secure/ConfigureReport.jspa?projectOrFilterId=project-12319527=daily=300=12319527=com.atlassian.jira.jira-core-reports-plugin%3Aaverageage-report_token=A5KQ-2QAV-T4JA-FDED_ea6ac783c727523cf6bfed04ba94ce91bb62da91_lin=Next
>
> On Thu, May 6, 2021 at 5:54 PM Kenneth Knowles 
> wrote:
>
>> I made a quick* Jira chart to see how we are doing at flakes:
>>
>>
>> https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=464=BEAM=reporting=cumulativeFlowDiagram=1174=1175=2038=2039=2040

Re: How to flush window when draining a Dataflow pipeline?

2021-05-21 Thread Jeff Klukas
On Fri, May 21, 2021 at 11:47 AM Kenneth Knowles  wrote:

> +dev  +Reuven Lax 
>
> Advancing the watermark to infinity does have an effect on the
> GlobalWindow. The GlobalWindow ends a little bit before infinity :-). That
> is why this works to cause the output even for unbounded aggregations.
>

I'm definitely glad to hear that GlobalWindow is supposed to close on
Drain, so it sounds like the FixedWindows work around is not necessary.

If the watermark advances with the intention of causing windows to close,
then it's unclear to me in what cases droppedDueToLateness would be
expected, and whether it would be expected in our case.

Is it possible that the watermark is advanced while there are still
messages working their way through the pipeline, so that by the time they
hit the aggregation, they're considered late? If so, is there a way to
prevent that?


> On Fri, May 21, 2021 at 5:10 AM Jeff Klukas  wrote:
>
>> Beam users,
>>
>> We're attempting to write a Java pipeline that uses Count.perKey() to
>> collect event counts, and then flush those to an HTTP API every ten minutes
>> based on processing time.
>>
>> We've tried expressing this using GlobalWindows with an
>> AfterProcessingTime trigger, but we find that when we drain the pipeline
>> we end up with entries in the droppedDueToLateness metric. This was
>> initially surprising, but may be line line with documented behavior [0]:
>>
>> > When you issue the Drain command, Dataflow immediately closes any
>> in-process windows and fires all triggers. The system does not wait for any
>> outstanding time-based windows to finish. Dataflow causes open windows to
>> close by advancing the system watermark to infinity
>>
>> Perhaps advancing watermark to infinity has no effect on GlobalWindows,
>> so we attempted to get around this by using a fixed but arbitrarily-long
>> window:
>>
>> FixedWindows.of(Duration.standardDays(36500))
>>
>> The first few tests with this configuration came back clean, but the
>> third test again showed droppedDueToLateness after calling Drain. You can
>> see this current configuration in [1].
>>
>> Is there a pattern for reliably flushing on Drain when doing processing
>> time-based aggregates like this?
>>
>> [0]
>> https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#effects_of_draining_a_job
>> [1]
>> https://github.com/mozilla/gcp-ingestion/pull/1689/files#diff-1d75ce2cbda625465d5971a83d842dd35e2eaded2c2dd2b6c7d0d7cdfd459115R58-R71
>>
>>


Re: [DISCUSSION] Docker based development environment issue

2021-05-21 Thread Brian Hulette
I think the build environment was set up with that configured:
https://github.com/apache/beam/blob/40326dd0a2a1c9b5dcbbcd6486a43e3875a64a43/start-build-env.sh#L110
Could there be something about your environment preventing that from
working?

Brian

On Fri, May 21, 2021 at 3:34 AM Gleb Kanterov  wrote:

> Is it possible to mount the Docker socket inside the build-env Docker
> container? We run a lot of similar tests in CI, and it always worked:
>
> --mount type=bind,source=/var/run/docker.sock,target=/var/run/docker.sock
>
> On Fri, May 21, 2021 at 12:26 PM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>> Hello,
>>
>> Beam provides a very cool feature to run a local development environment
>> via Docker [1]. In the same time, some unit tests require to run Docker
>> containers to test against “real” instances (for example,
>> ClickHouseIOTest). So, it will end up with “docker-in-docker” issue and
>> such tests will fail.
>>
>> What would be a proper solution for that? Annotate these tests with a
>> specific “DockerImageRequired” annotation and skip them when running from
>> inside container or something else? Any ideas on this?
>>
>> Thanks,
>> Alexey
>>
>>
>> [1] https://github.com/apache/beam/blob/master/start-build-env.sh
>
>


Re: How to flush window when draining a Dataflow pipeline?

2021-05-21 Thread Kenneth Knowles
+dev  +Reuven Lax 

Advancing the watermark to infinity does have an effect on the
GlobalWindow. The GlobalWindow ends a little bit before infinity :-). That
is why this works to cause the output even for unbounded aggregations.

Kenn

On Fri, May 21, 2021 at 5:10 AM Jeff Klukas  wrote:

> Beam users,
>
> We're attempting to write a Java pipeline that uses Count.perKey() to
> collect event counts, and then flush those to an HTTP API every ten minutes
> based on processing time.
>
> We've tried expressing this using GlobalWindows with an
> AfterProcessingTime trigger, but we find that when we drain the pipeline
> we end up with entries in the droppedDueToLateness metric. This was
> initially surprising, but may be line line with documented behavior [0]:
>
> > When you issue the Drain command, Dataflow immediately closes any
> in-process windows and fires all triggers. The system does not wait for any
> outstanding time-based windows to finish. Dataflow causes open windows to
> close by advancing the system watermark to infinity
>
> Perhaps advancing watermark to infinity has no effect on GlobalWindows, so
> we attempted to get around this by using a fixed but arbitrarily-long
> window:
>
> FixedWindows.of(Duration.standardDays(36500))
>
> The first few tests with this configuration came back clean, but the third
> test again showed droppedDueToLateness after calling Drain. You can see
> this current configuration in [1].
>
> Is there a pattern for reliably flushing on Drain when doing processing
> time-based aggregates like this?
>
> [0]
> https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#effects_of_draining_a_job
> [1]
> https://github.com/mozilla/gcp-ingestion/pull/1689/files#diff-1d75ce2cbda625465d5971a83d842dd35e2eaded2c2dd2b6c7d0d7cdfd459115R58-R71
>
>


Re: Flake trends - better?

2021-05-21 Thread Kenneth Knowles
Yes, I think that a one-time focused push to triage old "flake" tags might
have a long-term impact. We currently have 35 open "flake" labeled bugs,
with average age of about 350 days, with a significant number of very old
bugs that are almost certainly obsolete.

That does not mean all is well, because I think often the solution is to
disable the flake to restore signal, so then we need to watch the ignored
tests. I have used the "sickbay" tag on Jira but we could choose a more
self-explanatory one. Also it would be better to pull the info from the
code directly.

Kenn

On Mon, May 10, 2021 at 12:32 PM Ahmet Altay  wrote:

> Any suggestions on how to clean this up? We can organize a cleanup to
> reduce the numbers a bit. Ideally we need to find a way to prevent the
> future growth but a temporary reduction might make it easier for us to keep
> reviewing and closing new issues.
>
> On Mon, May 10, 2021 at 9:11 AM Brian Hulette  wrote:
>
>> In addition to stale flake jiras, I think there are also many tracking
>> tests that were disabled years ago due to flakiness.
>>
>> On Sat, May 8, 2021 at 1:39 PM Kenneth Knowles  wrote:
>>
>>> Oh the second chart is not automatically associated with the
>>> board/filter. Here is the correct link:
>>> https://issues.apache.org/jira/secure/ConfigureReport.jspa?projectOrFilterId=filter-12350547=daily=300=12319527=com.atlassian.jira.jira-core-reports-plugin%3Aaverageage-report_token=A5KQ-2QAV-T4JA-FDED_ea6ac783c727523cf6bfed04ba94ce91bb62da91_lin=Next
>>>
>>> On Sat, May 8, 2021 at 1:37 PM Kenneth Knowles  wrote:
>>>
 The second chart is clearly bad and getting worse. Our flake bugs are
 not getting addressed in a timely manner.

 Zooming in on the first chart for the last 3 months you can see a
 notable change:
 https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=464=BEAM=reporting=cumulativeFlowDiagram=1174=1175=2038=2039=2040=90.
 This will not change the average in the second chart very quickly.

 It may be just cleanup. That seems likely. Anecdotally, I have done a
 lot of triage recently of failures and I know of only two severe flakes
 (that you can count on seeing in a day/week). If so, then more cleanup
 would be valuable. This is why I ran the second report: I suspected that we
 had a lot of very old stale flake bugs that noone is looking at.

 Kenn

 On Fri, May 7, 2021 at 4:37 PM Ahmet Altay  wrote:

> Thank you for sharing the charts.
>
> I know you are the messenger here, but I disagree with the message
> that flakes are getting noticeably better. Number of open issues look 
> quite
> large but at least stable. I will guess that some of those are stale and
> seemingly we did a clean up in July 2020. We can try that again. Second
> chart shows a bad picture IMO. Issues staying open for 500-600 days on
> average sounds like really long.
>
> On Fri, May 7, 2021 at 1:42 PM Kenneth Knowles 
> wrote:
>
>> Alright, I think it should be fixed. The underlying saved filter had
>> not been shared.
>>
>> Kenn
>>
>> On Fri, May 7, 2021 at 8:02 AM Brian Hulette 
>> wrote:
>>
>>> The first link doesn't work for me, I just see a blank page with
>>> some jira header and navbar. Do I need some additional permissions?
>>>
>>> If I click over to "Kanban Board" on the toggle at the top right I
>>> see a card with "Error: The requested board cannot be viewed because it
>>> either does not exist or you do not have permission to view it."
>>>
>>> Brian
>>>
>>> On Thu, May 6, 2021 at 5:56 PM Kenneth Knowles 
>>> wrote:
>>>
 I spoke too soon?


 https://issues.apache.org/jira/secure/ConfigureReport.jspa?projectOrFilterId=project-12319527=daily=300=12319527=com.atlassian.jira.jira-core-reports-plugin%3Aaverageage-report_token=A5KQ-2QAV-T4JA-FDED_ea6ac783c727523cf6bfed04ba94ce91bb62da91_lin=Next

 On Thu, May 6, 2021 at 5:54 PM Kenneth Knowles 
 wrote:

> I made a quick* Jira chart to see how we are doing at flakes:
>
>
> https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=464=BEAM=reporting=cumulativeFlowDiagram=1174=1175=2038=2039=2040
>
> Looking a lot better recently at resolving them! (whether these
> are new fixes or just resolving stale bugs, I love it)
>
> Kenn
>
> *AFAICT you need to make a saved search, then an agile board based
> on the saved search, then you can look at reports
>



Re: Some questions around GroupIntoBatches

2021-05-21 Thread Reuven Lax
Ah I see, you want to set the event-time timer in the future. Could also
accomplish this with an end-of-window timer for which we keep updating the
hold.

Unfortunately, this still doesn't quite work. The output will need to
happen from the processing-time timer, so Beam will likely reject it due to
being an output "in the past". In the past is determined by comparing
the output timestamp to the timestamp of the current element being
processed (for the case of timers, the timestamp of the timer is the output
timestamp of that timer). The fact that there is another timer extant
with that output timestamp doesn't help here.

On Fri, May 21, 2021 at 2:06 AM Jan Lukavský  wrote:

> If I understand it correctly (and what I have observed from the actual
> behavior on both FlinkRunner and DirectRunner) a relative timer with zero
> duration will not fire immediately. It has to wait for the watermark to
> advance. It requires to fix [1] for the relative timer with output
> timestamp to work reliably.
>
>  Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-12276
> On 5/20/21 7:08 PM, Reuven Lax wrote:
>
> I don't think that would work well. It's entirely possible that the input
> watermark will already have passed the timestamp of the hold, in which case
> an event-time timer would fire immediately. You could make it a looping
> timer, but the new timer would also fire immediately after being set, and a
> tight timer loop isn't a good idea.
>
> A Timer.get() is one solution, though I think the only way we have to
> implement it is to store the timer's timestamp in a ValueState; doing this
> for every timer would add a lot of cost to pipelines.
>
> Another option is a Timer.getCurrentTime() method, that would return the
> current base time that setRelative is based off of. It seems like a strange
> function to add to Timer though.
>
> Another option is to use TimerMap to bucket timers. Every minute we round
> the current processing time to the nearest minute and set a timer with an
> expiration of that minute (and with the minute timestamp as its tag as
> well). This way we would have a continuous sequence of expiring timers, and
> we wouldn't have to set just the first one. The biggest problem with this
> approach is that we would also have to use MapState to store the desired
> watermark hold per  processing-time bucket. MapState is not supported by
> many runners yet, so I don't want to use it in a basic transform like
> GroupIntoBatches (furthermore - a transform that works on most runners
> today).
>
> Reuven
>
>
> On Thu, May 20, 2021 at 2:11 AM Jan Lukavský  wrote:
>
>> Sounds like you could solve that using second event time timer, that
>> would be actually used only to hold the output timestamp (watermark hold).
>> Something like
>>
>>
>> eventTimer.withOutputTimestamp(currentBufferMinimalStamp).offset(Duration.ZERO).setRelative()
>>
>> when the timer fires, you would only reset the minimum.
>>
>> It is sort of ugly, though. It would be cool to have a way to get the
>> current timestamp a timer is set to (if any).
>>
>>  Jan
>> On 5/20/21 3:12 AM, Reuven Lax wrote:
>>
>> 100% - the contract should not change because things are in a bundle.
>> IIRC there are some open bugs in Beam around this that really should be
>> fixed.
>>
>> My issue with GroupIntoBatches is different. This transform works as
>> follows:
>>
>> if (this is the first element in batch - checked by reading a count
>> stored in a ValueState)
>>timer.offset(bufferingDuration).setRelative()
>>
>> This makes it tricky to use setTimer.withOutputTimestamp. Inputs are not
>> guaranteed to be in order, so simply adding a withOutputTimestamp would set
>> the timestamp to be whatever the first element happened to be; it really
>> should be the minimum timestamp of all elements in the buffer. If we
>> started setting the timer on every element, then timer.offset.setRelative
>> would keep bumping the (processing-time) timer into the future and it would
>> never expire.
>>
>> One solution would be to store the timer timestamp in a ValueState, and
>> use Timer.set to set the timer to an absolute timestamp. This would allow
>> us to always reset the timer to the same expiration target, just modifying
>> the output timestamp each time. However, this will break DirectRunner
>> tests. The DirectRunner allows the user to control the advancement of
>> processing time when using TestStream, but this facility doesn't work well
>> if the transform sets the processing-time timer using absolute set() calls.
>>
>> I'm not sure how to solve this using the existing Timer API.
>>
>> On Wed, May 19, 2021 at 4:39 PM Robert Bradshaw 
>> wrote:
>>
>>> +1. It was my understanding as well that consensus was that timers
>>> must be delivered in timestamp order, and "within bundle"
>>> resetting/clearing of timers should be respected (as if each timer was
>>> in its own bundle).
>>>
>>> On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles  wrote:
>>> >
>>> > Reading over 

Re: [DISCUSSION] Docker based development environment issue

2021-05-21 Thread Gleb Kanterov
Is it possible to mount the Docker socket inside the build-env Docker
container? We run a lot of similar tests in CI, and it always worked:

--mount type=bind,source=/var/run/docker.sock,target=/var/run/docker.sock

On Fri, May 21, 2021 at 12:26 PM Alexey Romanenko 
wrote:

> Hello,
>
> Beam provides a very cool feature to run a local development environment
> via Docker [1]. In the same time, some unit tests require to run Docker
> containers to test against “real” instances (for example,
> ClickHouseIOTest). So, it will end up with “docker-in-docker” issue and
> such tests will fail.
>
> What would be a proper solution for that? Annotate these tests with a
> specific “DockerImageRequired” annotation and skip them when running from
> inside container or something else? Any ideas on this?
>
> Thanks,
> Alexey
>
>
> [1] https://github.com/apache/beam/blob/master/start-build-env.sh


[DISCUSSION] Docker based development environment issue

2021-05-21 Thread Alexey Romanenko
Hello,

Beam provides a very cool feature to run a local development environment via 
Docker [1]. In the same time, some unit tests require to run Docker containers 
to test against “real” instances (for example, ClickHouseIOTest). So, it will 
end up with “docker-in-docker” issue and such tests will fail. 

What would be a proper solution for that? Annotate these tests with a specific 
“DockerImageRequired” annotation and skip them when running from inside 
container or something else? Any ideas on this?

Thanks,
Alexey


[1] https://github.com/apache/beam/blob/master/start-build-env.sh

Re: Some questions around GroupIntoBatches

2021-05-21 Thread Jan Lukavský
If I understand it correctly (and what I have observed from the actual 
behavior on both FlinkRunner and DirectRunner) a relative timer with 
zero duration will not fire immediately. It has to wait for the 
watermark to advance. It requires to fix [1] for the relative timer with 
output timestamp to work reliably.


 Jan

[1] https://issues.apache.org/jira/browse/BEAM-12276

On 5/20/21 7:08 PM, Reuven Lax wrote:
I don't think that would work well. It's entirely possible that the 
input watermark will already have passed the timestamp of the hold, in 
which case an event-time timer would fire immediately. You could make 
it a looping timer, but the new timer would also fire immediately 
after being set, and a tight timer loop isn't a good idea.


A Timer.get() is one solution, though I think the only way we have to 
implement it is to store the timer's timestamp in a ValueState; doing 
this for every timer would add a lot of cost to pipelines.


Another option is a Timer.getCurrentTime() method, that would return 
the current base time that setRelative is based off of. It seems like 
a strange function to add to Timer though.


Another option is to use TimerMap to bucket timers. Every minute we 
round the current processing time to the nearest minute and set a 
timer with an expiration of that minute (and with the minute timestamp 
as its tag as well). This way we would have a continuous sequence of 
expiring timers, and we wouldn't have to set just the first one. The 
biggest problem with this approach is that we would also have to use 
MapState to store the desired watermark hold per  processing-time 
bucket. MapState is not supported by many runners yet, so I don't want 
to use it in a basic transform like GroupIntoBatches (furthermore - a 
transform that works on most runners today).


Reuven


On Thu, May 20, 2021 at 2:11 AM Jan Lukavský > wrote:


Sounds like you could solve that using second event time timer,
that would be actually used only to hold the output timestamp
(watermark hold). Something like


eventTimer.withOutputTimestamp(currentBufferMinimalStamp).offset(Duration.ZERO).setRelative()

when the timer fires, you would only reset the minimum.

It is sort of ugly, though. It would be cool to have a way to get
the current timestamp a timer is set to (if any).

 Jan

On 5/20/21 3:12 AM, Reuven Lax wrote:

100% - the contract should not change because things are in a
bundle. IIRC there are some open bugs in Beam around this that
really should be fixed.

My issue with GroupIntoBatches is different. This transform works
as follows:

if (this is the first element in batch - checked by reading a
count stored in a ValueState)
   timer.offset(bufferingDuration).setRelative()

This makes it tricky to use setTimer.withOutputTimestamp. Inputs
are not guaranteed to be in order, so simply adding a
withOutputTimestamp would set the timestamp to be whatever the
first element happened to be; it really should be the minimum
timestamp of all elements in the buffer. If we started setting
the timer on every element, then timer.offset.setRelative would
keep bumping the (processing-time) timer into the future and it
would never expire.

One solution would be to store the timer timestamp in a
ValueState, and use Timer.set to set the timer to an absolute
timestamp. This would allow us to always reset the timer to the
same expiration target, just modifying the output timestamp each
time. However, this will break DirectRunner tests. The
DirectRunner allows the user to control the advancement of
processing time when using TestStream, but this facility doesn't
work well if the transform sets the processing-time timer using
absolute set() calls.

I'm not sure how to solve this using the existing Timer API.

On Wed, May 19, 2021 at 4:39 PM Robert Bradshaw
mailto:rober...@google.com>> wrote:

+1. It was my understanding as well that consensus was that
timers
must be delivered in timestamp order, and "within bundle"
resetting/clearing of timers should be respected (as if each
timer was
in its own bundle).

On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles
mailto:k...@apache.org>> wrote:
>
> Reading over the other thread, there was consensus to
implement.
>
> Reading commentary on the PR, there were good questions
raised about the semantics. Questions which I feel able to
have an opinion about :-)
>
> The questions surrounded bundling and timers in the same
bundling clearing each other. Actually the same questions
apply to timers re-setting later timers and +Jan Lukavský has
raised this already (among other people) so we kind of know
the answer now, and I think +Boyuan Zhang code was good