Re: BEAM-6855

2021-02-23 Thread Ahmet Altay
Hemali, would this be a reasonable workaround for your problem?

/cc +Kenneth Knowles  - In case there is an alternative
workaround to BEAM-6855.
/cc +Cosmin Arad 

On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette  wrote:

> I added JvmInitializer [1] to do some one-time initialization per JVM
> before processing starts. It might be useful here... the intended use-case
> was to perform quick configuration functions, but I suppose you could use
> it to pull some data that you can reference later.
>
> [1]
> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html
>
> On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada  wrote:
>
>> +Brian Hulette  I believe you worked on a way to
>> load data on worker startup?
>>
>> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins 
>> wrote:
>>
>>> The getState function should be static, sorry. "synchronized static
>>> @NotNull MyState getState()"
>>>
>>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins 
>>> wrote:
>>>
 > On every dataflow start, I want to read from CloudSQL and build the
 cache

 If you do this outside of dataflow, you can use a static to do this on
 every worker start. Is that what you're looking for? For example:

 final class StateLoader {
   private StateLoader() {}

   @GuardedBy("this")
   private static @Nullable MyState state;

   synchronized @NotNull MyState getState() {
 if (state == null) {
   state = LoadStateFromSQL();
 }
 return state;
   }
 }

 On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
 hsuta...@paloaltonetworks.com> wrote:

> Hi,
>
> I have one question. This is *kind of a blocker for our upcoming
> release*. It would be great if you could reply at your earliest
> convenience.
>
> My dataflow pipeline is stateful. I am using Beam SDK for stateful
> processing (StateId, ValueState). I have also implemented OnTimer for my
> stateful transformation. On every dataflow start, I want to read from
> CloudSQL and build the cache. For that, I need to provide the pre-built
> cache as side-input to my current transform. But, it looks like there is
> some issue when I add side input to my stateful transform. I think I am
> hitting BEAM-6855 issue (
> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
> workaround? Any help would be appreciated.
>
> Following is my definition of Transforms. I am using 2.23.0 beam SDK.
> I am using GlobalWindow.
>
> private class GetLatestState extends DoFn DataTunnelStatus>, DataTunnelStateRelational> {
> @TimerId("tunnelStatusExpiryTimer")
> private final TimerSpec tunnelStatusExpiryTimer = 
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
> @StateId("tunnelStatus")
> private final StateSpec> 
> tunnelStatusCache =
> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>
> @ProcessElement
> public void process(@Element KV 
> input,
> MultiOutputReceiver out,
> @StateId("tunnelStatus") ValueState 
> tunnelStatusCache,
> @TimerId("tunnelStatusExpiryTimer") Timer 
> tunnelStatusExpiryTimer,
> ProcessContext c)
>
>
>
> Thanks,
> Hemali Sutaria
>
>


Re: Should we support VCF IO on Python 3?

2021-02-23 Thread Cory McLean
+1 to removing from the codebase, and if it becomes of interest again,
porting to cyvcf2. But most genomics workflows are not using Beam at the
moment.

On Tue, Feb 23, 2021 at 1:12 AM Chamikara Jayalath 
wrote:

> Given that we don't support Python 2 anymore, it sounds like this is just
> broken code and we cannot expect anybody to be using it (after Beam 2.24.0).
> If so +1 for removing it from the codebase. If we decide to add it back
> with Python3 support, we should be able to refer to (working) 2.24.0
> implementation.
>
> Thanks,
> Cham
>
> On Mon, Feb 22, 2021 at 5:17 PM Valentyn Tymofieiev 
> wrote:
>
>> Hi Yoshiki,
>>
>> If switching the code to a new version of VCF package is something easy
>> to do, I would keep the code, but keep the dependency on vcf packages
>> optional, since we know that this code is not in use.  If you decide to try
>> this route,  https://issues.apache.org/jira/browse/BEAM-5628 mentions
>> cyvcf2 as a possible replacement.
>>
>> If replacement is not trivial and/or nobody is interested in making it
>> work, I would remove this IO.
>>
>> CC'ing a few folks who may have an opinion: +Chamikara Jayalath
>>  +Cory McLean  .
>>
>> Thanks for your help with the cleanup!
>>
>> On Sun, Feb 21, 2021 at 4:23 AM Yoshiki Obata 
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm cleaning up Python 2 codepath now and find that VCF IO codes still
>>> remain though they might not work properly with latest Beam because
>>> they depend on PyVCF which does not support Python 3.
>>> According to comments in vcfio.py, migrating to Nucleus is expected,
>>> but it is concluded that the plan is not the right option at the
>>> comment of https://issues.apache.org/jira/browse/BEAM-5628
>>>
>>> Now, it would be needed to decide which should we do for VCF IO - drop
>>> support or maintain support using another vcf package.
>>> Would anyone have a basis for the decision?
>>>
>>> Yoshiki
>>>
>>


Re: FileIO.Write fails silently

2021-02-23 Thread Tapan Upadhyay
Yes we have checkpointing enabled in our cluster.

Pipeline runs fine when I do a restart without making any code changes but 
after adding a DoFn class (that too not used in pipeline), we have observed 
pipeline still reads from kafka, does windowing and distinct but does not write 
to S3.

I have checked the watermark for FileIO.Write does not progess in Flink UI and 
it shows the watermark when the savepoint was done.

On 2021/02/18 20:36:45, Reuven Lax  wrote: 
> Do you have checkpointing enabled in your Flink cluster?
> 
> On Thu, Feb 18, 2021 at 11:50 AM Tapan Upadhyay  wrote:
> 
> > Hi,
> >
> > I am currently working on a Beam pipeline (Flink runner) where we read
> > JSON events from Kafka and write the output in parquet format to S3.
> >
> > We write to S3 after every 10 min.
> >
> > We have observed that our pipeline sometimes stops writing to S3 after
> > restarts (even for a non breaking minor code change), if we change kafka
> > offset and restart pipeline it starts writing to S3 again.
> >
> > While s3 write fails, Pipeline runs fine without any issues and it
> > processes records until FileIO stage. It gives no error/exceptions in logs
> > but silently fails to write to S3 at FileIO stage.
> >
> > This is the stage where it is not sending any records out -
> > FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards ->
> > FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
> > -> FileIO.Write/WriteFiles/GatherTempFileResults/Add void
> > key/AddKeys/Map/ParMultiDo(Anonymous)
> >
> > We have checked our Windowing function by logging records after windowing,
> > windowing works fine.
> >
> > This is our code snippet -
> >
> > parquetRecord.apply("Batch Events", Window.into(
> >
> > FixedWindows.of(Duration.standardMinutes(Integer.parseInt(windowTime
> > .triggering(AfterWatermark.pastEndOfWindow())
> > .withAllowedLateness(Duration.ZERO,
> > Window.ClosingBehavior.FIRE_ALWAYS)
> > .discardingFiredPanes())
> >
> > .apply(Distinct.create())
> >
> > .apply(FileIO.write()
> > .via(ParquetIO.sink(getOutput_schema()))
> > .to(outputPath.isEmpty() ? outputPath() :
> > outputPath)
> > .withNumShards(1)
> > .withNaming(new
> > CustomFileNaming("snappy.parquet")));
> >
> > Any idea what could be wrong here or any open bugs in Beam?
> >
> >
> > Regards,
> > Tapan Upadhyay
> >
> >
> 


Re: Do we need synchronized processing time? / What to do about "continuation triggers"?

2021-02-23 Thread Robert Bradshaw
On Tue, Feb 23, 2021 at 1:07 AM Jan Lukavský  wrote:

> First, +1 to the conclusion of this thread.
>
> One note regarding the composite transforms and triggers *inside* those
> transforms - I think that propagating the triggering from input PCollection
> might be even dangerous and composite PTransforms that would be sensitive
> to the change of triggering will (should!) override the input triggering,
> and therefore adjusting it upfront will not work. There is clear option for
> composite PTransform (which includes one or more GBKs) to create API to
> specify the _input_ triggering of the composite as a whole, i.e.
>
>  input.apply(MyComposite.create().triggering())
>
> which (consistently with how triggering works for pure GBK) would change
> the input triggering (if we define trigger as "buffer input in state, flush
> buffer when trigger fires") of the PTransform. The PTransform knows how it
> expands and so it is quite easy to do the output triggering correctly.
>
When we originally explored this (for windowing, before triggering existed)
we looked at the number of composite operations (combining, joining, cogbk,
...) that contained GBKs and realized it would add a lot of boilerplate to
manually pass through the windowing information to each. Worse, this is a
burden placed on every author of a composite operation (and omitting this
argument, or hard coding a default, would be strictly worse). Triggering
doesn't flow as nicely, but requiring it on every subtransform invocation
during pipeline construction would have the same downsides of verbosity.

> Regarding the sink triggering - out of curiosity, how does that differ
> from applying the triggering on the very first GBK(s) and the subsequently
> trigger all downstream GBKs using AfterPane.elementCountAtLeast(1)? It
> seems to me, that from user perspective what I will want to define is not
> "how often output should be written", but "how quickly output should react
> to the change of input" - therefore I *must* trigger with at least this
> frequency from the source and then propagate each pane as quickly as
> possible to the output. Am I missing something?
>
Here "sink" is really any observable outside effect, so I think "how often
output should be written" and "how quickly output should react to the
change of input" are the same.

As an example, if I want, say, hourly output, triggering hourly at the
source and then as quickly as possible from then on may be wasteful. It may
also be desirable to arrange such that certain transforms only have a
single pane per window, which is easier to propagate up than down. As
another example, consider accumulating vs. discarding. If I have
CombineValues(sum) followed by a re-keying and another CombineValues(sum),
and I want the final output to be accumulating, the first must be
discarding (or, better, retractions). Propagating upwards is possible in a
way propagating downward is not.



>
>  Jan
>
>
> On 2/22/21 9:53 PM, Reuven Lax wrote:
>
> I really wish that we had found the time to build sink triggers. Jan is
> right - specifying triggers up front and having them propagate down is
> confusing (it's also a bit confusing for Windows, but with Windows the
> propagation at least makes sense). The fact that users rarely have access
> to the actual GBK operation means that allowing them to specify triggers on
> their sinks is the best approach.
>
> On Mon, Feb 22, 2021 at 12:48 PM Robert Bradshaw 
> wrote:
>
>> On Mon, Feb 22, 2021 at 11:51 AM Kenneth Knowles  wrote:
>>
>>> I agree completely: Triggers control the output of the GBK.
>>>
>>> The issue is composite transforms, where there will be a GBK deep inside
>>> some code and the user cannot adjust the triggering.
>>>
>>> What a user really wants is "sink triggers
>>> " [1], a purely hypothetical
>>> feature where they specify the latency requirements on each _output_ and
>>> everything else is figured out automatically. Unfortunately, sink triggers
>>> require retractions, so each PCollection can be a complete changelog.
>>> Otherwise transformations cannot be transparently correct throughout a
>>> pipeline and triggers cannot be decoupled from pipeline logic. Retractions
>>> themselves are not necessarily complex in some cases (Flink SQL has them -
>>> they are extra easy for "pure" code) but require a massive working of the
>>> library of transforms, particularly IOs. And backwards compatibility
>>> concerns for existing DoFns are somewhat tricky. We've had two prototypes
>>> [2] [3] and some important design investigations [4], but no time to really
>>> finish adding them, even as just an optional experiment. And once we have
>>> retractions, there is still a lot to figure out to finish sink triggers.
>>> They may not even really be possible!
>>>
>>> So for now, we do our best with the user setting up triggering at the
>>> beginning of the pipeline instead of the end of the pipeline. The very
>>> first GBK (which ma

Re: Do we need synchronized processing time? / What to do about "continuation triggers"?

2021-02-23 Thread Jan Lukavský

First, +1 to the conclusion of this thread.

One note regarding the composite transforms and triggers *inside* those 
transforms - I think that propagating the triggering from input 
PCollection might be even dangerous and composite PTransforms that would 
be sensitive to the change of triggering will (should!) override the 
input triggering, and therefore adjusting it upfront will not work. 
There is clear option for composite PTransform (which includes one or 
more GBKs) to create API to specify the _input_ triggering of the 
composite as a whole, i.e.


 input.apply(MyComposite.create().triggering())

which (consistently with how triggering works for pure GBK) would change 
the input triggering (if we define trigger as "buffer input in state, 
flush buffer when trigger fires") of the PTransform. The PTransform 
knows how it expands and so it is quite easy to do the output triggering 
correctly.


Regarding the sink triggering - out of curiosity, how does that differ 
from applying the triggering on the very first GBK(s) and the 
subsequently trigger all downstream GBKs using 
AfterPane.elementCountAtLeast(1)? It seems to me, that from user 
perspective what I will want to define is not "how often output should 
be written", but "how quickly output should react to the change of 
input" - therefore I *must* trigger with at least this frequency from 
the source and then propagate each pane as quickly as possible to the 
output. Am I missing something?


 Jan


On 2/22/21 9:53 PM, Reuven Lax wrote:
I really wish that we had found the time to build sink triggers. Jan 
is right - specifying triggers up front and having them propagate down 
is confusing (it's also a bit confusing for Windows, but with Windows 
the propagation at least makes sense). The fact that users rarely have 
access to the actual GBK operation means that allowing them to specify 
triggers on their sinks is the best approach.


On Mon, Feb 22, 2021 at 12:48 PM Robert Bradshaw > wrote:


On Mon, Feb 22, 2021 at 11:51 AM Kenneth Knowles mailto:k...@apache.org>> wrote:

I agree completely: Triggers control the output of the GBK.

The issue is composite transforms, where there will be a GBK
deep inside some code and the user cannot adjust the triggering.

What a user really wants is "sink triggers
" [1], a purely
hypothetical feature where they specify the latency
requirements on each _output_ and everything else is figured
out automatically. Unfortunately, sink triggers require
retractions, so each PCollection can be a complete changelog.
Otherwise transformations cannot be transparently correct
throughout a pipeline and triggers cannot be decoupled from
pipeline logic. Retractions themselves are not necessarily
complex in some cases (Flink SQL has them - they are extra
easy for "pure" code) but require a massive working of the
library of transforms, particularly IOs. And backwards
compatibility concerns for existing DoFns are somewhat tricky.
We've had two prototypes [2] [3] and some important design
investigations [4], but no time to really finish adding them,
even as just an optional experiment. And once we have
retractions, there is still a lot to figure out to finish sink
triggers. They may not even really be possible!

So for now, we do our best with the user setting up triggering
at the beginning of the pipeline instead of the end of the
pipeline. The very first GBK (which may be deep in library
code) is controlled by the triggering they set up and all the
rest get the "continuation trigger" which tries to just let
the data flow. Unless they set up another bit of triggering.
Some of our transforms do this for various reasons.

I think the conclusion of this particular thread is:

 - make all the SDKs use AfterSynchronizedProcessingTime triggers
 - allow runners to do whatever they want when they see
AfterSynchronizedProcessingTime trigger
 - remove TimeDomain.afterSynchronizedProcessingTime from the
proto since it is only for timers and they should not use this
 - later, figure out if we want to add support for making
downstream triggering optional (could be useful prep for sink
triggers)


+1

[1] https://s.apache.org/beam-sink-triggers
[2] https://github.com/apache/beam/pull/4742
[3] https://github.com/apache/beam/pull/9199
[4] https://s.apache.org/beam-retractions

On Mon, Feb 22, 2021 at 1:28 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

The same holds true for pane accumulation mode.

 Jan

On 2/22/21 10:21 AM, Jan Lukavský wrote:


Hi,

I'm not sure if I got eve