Re: Beam dropping events from Kafka after reshuffle ?

2024-09-25 Thread Jan Lukavský

Hi Piotr,

answer inline.

On 9/24/24 09:53, Piotr Wiśniowski wrote:


Subject: Input on Timestamps and Late Events in Beam Pipelines

Hi team,

I’d like to contribute to this discussion as I find it quite interesting.

Regarding the element timestamps mentioned by Jan, I can confirm that 
it's accurate—users can reassign element timestamps in the same way 
described. This should be sufficient for the timestamps to be 
recognized by downstream aggregation. Additionally, clock 
synchronization issues could indeed be causing late events, as Jan 
suggested.


It’s also worth noting that, by default, triggers output the 
aggregated result when they estimate that all data for a window has 
arrived, discarding any subsequent data for that window (as referenced 
in the same documentation Jan mentioned). I noticed that while your 
code defines allowed lateness, it doesn't specify a trigger to handle 
late events. As a result, these late events will likely be ignored. 
You might want to consider adding a trigger to the windowing function 
to re-output the results when late events arrive. This could help 
confirm the hypothesis, though in production, it's generally better to 
rely on the timestamps assigned by the source rather than reassigning 
them, as they should already be processing timestamps.


I also have a question for the Beam developers, or anyone who might know:

Assuming that Lydian does not reassign processing timestamps but 
instead reassigns data timestamps (which are not directly tied to 
processing time), what heuristics are used to determine when to close 
the downstream window in stream mode? Does Beam track the minimal 
timestamp seen and maintain state for this? What would the time window 
for such a heuristic be? Or, in this case, would the pipeline behave 
like it does in batch mode, halting while waiting for all data to 
arrive? I understand that the answer likely depends on the runner—I'm 
particularly interested in how this works in both Dataflow and Flink.


Beam creates watermarks propagating from sources to sinks. PTransforms 
have two watermarks - input watermark and output watermark. Output 
watermark might be _hold back_ by some logic (typically buffers and/or 
timers). Ressigning timestamps is a stateless process, which means it 
does not interfere with watermark propagation and as such can without 
additional care cause late data. SDKs have access to "watermark hold 
state" by which a stateful transform can control how input watermark 
propagate to output watermark. But this is not (directly) exposed to 
users. Users can control watermark hold only through timers and their 
output timestamp, which seems to be sufficient under the Beam model.


Best regards,
Piotr Wiśniowski


wt., 24 wrz 2024, 08:36 użytkownik Jan Lukavský  napisał:

Hi,

I do not use Python SDK, but it seems, that - as opposed to Java
SDK - using simple lambda returning TimestampedValue, can really
change the timestamp of element [1]. Maybe some more experienced
user of Python SDK can confirm this?

Assuming this is the case, then we have two factors at play:

 a) watermarks are computed at the source transform
(ReadFromKafka) using Java millisecond precision

 b) timestamps are later reassigned using Python's time.time()

Both calls use system clock to compute the timestamp and thus can
be influenced by clock synchronization (e.g. NTP). This can (at
least in theory) cause the second call to time.time() return
_smaller_ timestamp than the one used to compute the watermark,
which could cause the element to become late event. If this is the
issue, you can either increase allowed lateness, or (maybe more
conveniently) not reassign the timestamps, because there already
should be processing time assigned.

Let us know if any of this works for you!
Best,
 Jan

[1]

https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

On 9/24/24 02:09, marc hurabielle wrote:


Hello,

I am jumping on this, because we are doing same things as Lydian.
In our case, we are using default timestamp strategy in kafka (so
processing timestamp).
We were also doing same as Lydian to add processing timestamp
manually.


However we have late data. It mainly happen in our integration
test with flink. (parallelism 1), and happen really rarely in
production.

So it means we can't control the timestamp of an item even with
`window.TimestampedValue(event, time.time()))`?

Best,

Marc



On Tue, Sep 24, 2024, 04:23 Reuven Lax via user
 wrote:

Also as I said, the duplicate files might not appear like
duplicates, which can be quite confusing.

Out of curiosity, I would try - just for testing_ remove the
line where you "add" processing time, and also set
allowed_lat

Re: Beam dropping events from Kafka after reshuffle ?

2024-09-20 Thread Jan Lukavský

Hi Lydian,

because you do not specify 'timestamp_policy' it should use the default, 
which should be processing time, so this should not be the issue. The 
one potentially left transform is the sink transform, as Reuven 
mentioned. Can you share details of the implementation?


 Jan

On 9/19/24 23:10, Lydian Lee wrote:

Hi, Jan

Here's how we do ReadFromKafka, the expansion service is just to 
ensure we can work with xlang in k8s, so please ignore them.

from apache_beam.io.kafka import default_io_expansion_service
ReadFromKafka(
consumer_config={
"group.id <http://group.id>": "group-name",
"auto.offset.reset": "latest",
"enable.auto.commit": "false",
},
topics="topic-name",
with_metadata=False,
expansion_service=default_io_expansion_service(
append_args=[
f"--defaultEnvironmentType=PROCESS",
f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}',
"--experiments=use_deprecated_read",
]
),
commit_offset_in_finalize=True,
)

Do you know what would be the right approach for using processing time 
instead? I thought the WindowInto supposed to use the timestamp we 
appened to the event?  Do you think it is still using the original 
Kafka event timestamp?  Thanks!




On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský  wrote:

Can you share the (relevant) parameters of the ReadFromKafka
transform?

This feels strange, and it might not do what you'd expect:

| "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
event: window.TimestampedValue(event, time.time()))

This does not change the assigned timestamp of an element, but
creates a new element which contains processing time. It will not
be used for windowing, though.
On 9/19/24 00:49, Lydian Lee wrote:

Hi Reuven,

Here's a quick look for our pipeline:
(
pipeline
|"Reading message from Kafka">>ReadFromKafka(...)
| "Deserializing events" >> Deserialize(**deserializer_args)
| "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
event: window.TimestampedValue(event, time.time()))
| "Window into Fixed Intervals" >> beam.WindowInto(
beam.transforms.window.FixedWindows(fixed_window_size), #
fixed_window_size is 1 min.
allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness),
# although we configured lateness, but because we are using
processing time, i don't expect any late events
)
| "Adding random integer partition key" >> beam.Map(
lambda event: (random.randint(1, 5), element) # add dummy key to
reshuffle to less partitions.  Kafka have 16 partition, but we
only want to generate 2 files every minute
)
| "Group by randomly-assigned integer key" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
| "Writing event data batches to parquet"
>> WriteBatchesToS3(...) # call boto3 to write the events into S3
with parquet format
)

Thanks!


On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user
 wrote:

How are you doing this aggregation?

On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee
 wrote:

Hi Jan,

Thanks for the recommendation. In our case, we are
windowing with the processing time, which means that
there should be no late event at all.

You’ve mentioned that GroupByKey is stateful and can
potentially drop the data. Given that after reshuffle
(add random shuffle id to the key), we then do the
aggregation (combine the data and write those data to
S3.) Do you think the example I mentioned earlier could
potentially be the reason for the dropping data?

If so, in general how does Beam being able to prevent
that ? Are there any suggested approaches? Thanks

On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský
 wrote:

Hi Lydian,

in that case, there is only a generic advice you can
look into. Reshuffle is a stateless operation that
should not cause dropping data. A GroupByKey on the
other hand is stateful and thus can - when dealing
with late data - drop some of them. You should be
able to confirm this looking for
'droppedDueToLateness' counter and/or log in here
[1]. This happens when elements arrive after
watermark passes element's timestamp minus allowed
lateness. If you see the log, you might need to
either change how you assign timestamps to 

Re: Beam dropping events from Kafka after reshuffle ?

2024-09-19 Thread Jan Lukavský

Can you share the (relevant) parameters of the ReadFromKafka transform?

This feels strange, and it might not do what you'd expect:

| "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event: 
window.TimestampedValue(event, time.time()))


This does not change the assigned timestamp of an element, but creates a 
new element which contains processing time. It will not be used for 
windowing, though.


On 9/19/24 00:49, Lydian Lee wrote:

Hi Reuven,

Here's a quick look for our pipeline:
(
pipeline
|"Reading message from Kafka">>ReadFromKafka(...)
| "Deserializing events" >> Deserialize(**deserializer_args)
| "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda 
event: window.TimestampedValue(event, time.time()))

| "Window into Fixed Intervals" >> beam.WindowInto(
beam.transforms.window.FixedWindows(fixed_window_size), # 
fixed_window_size is 1 min.
allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), # 
although we configured lateness, but because we are using processing 
time, i don't expect any late events

)
| "Adding random integer partition key" >> beam.Map(
lambda event: (random.randint(1, 5), element) # add dummy key to 
reshuffle to less partitions.  Kafka have 16 partition, but we only 
want to generate 2 files every minute

)
| "Group by randomly-assigned integer key" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
| "Writing event data batches to parquet" >> WriteBatchesToS3(...) # 
call boto3 to write the events into S3 with parquet format

)

Thanks!


On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user 
 wrote:


How are you doing this aggregation?

On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee
 wrote:

Hi Jan,

Thanks for the recommendation. In our case, we are windowing
with the processing time, which means that there should be no
late event at all.

You’ve mentioned that GroupByKey is stateful and can
potentially drop the data. Given that after reshuffle (add
random shuffle id to the key), we then do the aggregation
(combine the data and write those data to S3.) Do you think
the example I mentioned earlier could potentially be the
reason for the dropping data?

If so, in general how does Beam being able to prevent that ?
Are there any suggested approaches? Thanks

On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský
 wrote:

Hi Lydian,

in that case, there is only a generic advice you can look
into. Reshuffle is a stateless operation that should not
cause dropping data. A GroupByKey on the other hand is
stateful and thus can - when dealing with late data - drop
some of them. You should be able to confirm this looking
for 'droppedDueToLateness' counter and/or log in here [1].
This happens when elements arrive after watermark passes
element's timestamp minus allowed lateness. If you see the
log, you might need to either change how you assign
timestamps to elements (e.g. use log append time) or
increase allowed lateness of your windowfn.

Best,

 Jan

[1]

https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132

On 9/18/24 08:53, Lydian Lee wrote:

I would love to, but there are some limitations on our
    ends that the version bump won’t be happened soon. Thus I
need to figure out what might be the root cause though.


On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský
 wrote:

Hi Lydian,

2.41.0 is quite old, can you please try current
version to see if this issue is still present? There
were lots of changes between 2.41.0 and 2.59.0.

 Jan

On 9/17/24 17:49, Lydian Lee wrote:

Hi,

We are using Beam Python SDK with Flink Runner, the
Beam version is 2.41.0 and the Flink version is 1.15.4.

We have a pipeline that has 2 stages:
1. read from kafka and fixed window for every 1 minute
2. aggregate the data for the past 1 minute and
reshuffle so that we have less partition count and
write them into s3.

We disabled the enable.auto.commit and enabled
commit_offset_in_finalize. also the
auto.offset.reset is set to "latest"
image.png


Re: Beam dropping events from Kafka after reshuffle ?

2024-09-18 Thread Jan Lukavský

Hi Lydian,

in that case, there is only a generic advice you can look into. 
Reshuffle is a stateless operation that should not cause dropping data. 
A GroupByKey on the other hand is stateful and thus can - when dealing 
with late data - drop some of them. You should be able to confirm this 
looking for 'droppedDueToLateness' counter and/or log in here [1]. This 
happens when elements arrive after watermark passes element's timestamp 
minus allowed lateness. If you see the log, you might need to either 
change how you assign timestamps to elements (e.g. use log append time) 
or increase allowed lateness of your windowfn.


Best,

 Jan

[1] 
https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132


On 9/18/24 08:53, Lydian Lee wrote:
I would love to, but there are some limitations on our ends that the 
version bump won’t be happened soon. Thus I need to figure out what 
might be the root cause though.



On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský  wrote:

Hi Lydian,

2.41.0 is quite old, can you please try current version to see if
this issue is still present? There were lots of changes between
2.41.0 and 2.59.0.

 Jan

On 9/17/24 17:49, Lydian Lee wrote:

Hi,

We are using Beam Python SDK with Flink Runner, the Beam version
is 2.41.0 and the Flink version is 1.15.4.

We have a pipeline that has 2 stages:
1. read from kafka and fixed window for every 1 minute
2. aggregate the data for the past 1 minute and reshuffle so that
we have less partition count and write them into s3.

We disabled the enable.auto.commit and enabled
commit_offset_in_finalize. also the auto.offset.reset is set to
"latest"
image.png

According to the log, I can definitely find the data is consuming
from Kafka Offset, Because there are many
```
Resetting offset for topic - to offset 
```
and that partition/offset pair does match the missing records. 
However, it doesn't show up in the final S3.

My current hypothesis is that the shuffling might be the reason
for the issue, for example, originally in kafka for the past
minute in partition 1,  I have offset 1, 2, 3 records. After
reshuffle, it now distribute, for example:
- partition A: 1, 3
- partition B: 2

And if partition A is done successfully but partition B fails.
Given that A is succeeded, it will commit its offset to Kafka,
and thus kafka now has an offset to 3.  And when kafka retries ,
it will skip the offset 2.   However, I am not sure how exactly
the offset commit works, wondering how it interacts with the
checkpoints.  But it does seem like if my hypothesis is correct,
we should be seeing more missing records, however, this seems
rare to happen.  Wondering if anyone can help identify potential
root causes?  Thanks





Re: Beam dropping events from Kafka after reshuffle ?

2024-09-17 Thread Jan Lukavský

Hi Lydian,

2.41.0 is quite old, can you please try current version to see if this 
issue is still present? There were lots of changes between 2.41.0 and 
2.59.0.


 Jan

On 9/17/24 17:49, Lydian Lee wrote:

Hi,

We are using Beam Python SDK with Flink Runner, the Beam version is 
2.41.0 and the Flink version is 1.15.4.


We have a pipeline that has 2 stages:
1. read from kafka and fixed window for every 1 minute
2. aggregate the data for the past 1 minute and reshuffle so that we 
have less partition count and write them into s3.


We disabled the enable.auto.commit and enabled 
commit_offset_in_finalize. also the auto.offset.reset is set to "latest"

image.png

According to the log, I can definitely find the data is consuming from 
Kafka Offset, Because there are many

```
Resetting offset for topic -  to offset 
```
and that partition/offset pair does match the missing records.  
However, it doesn't show up in the final S3.


My current hypothesis is that the shuffling might be the reason for 
the issue, for example, originally in kafka for the past minute in 
partition 1,  I have offset 1, 2, 3 records. After reshuffle, it now 
distribute, for example:

- partition A: 1, 3
- partition B: 2

And if partition A is done successfully but partition B fails. Given 
that A is succeeded, it will commit its offset to Kafka, and thus 
kafka now has an offset to 3.  And when kafka retries , it will skip 
the offset 2.   However, I am not sure how exactly the offset commit 
works, wondering how it interacts with the checkpoints.  But it does 
seem like if my hypothesis is correct, we should be seeing more 
missing records, however, this seems rare to happen.  Wondering if 
anyone can help identify potential root causes?  Thanks






Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-02 Thread Jan Lukavský
Unfortunately, no. At least not in the case of FlnkRunner. As already 
mentioned, Beam does not currently collect information about location of 
source splits, thus this information cannot be passed to Flink.


> If there is no locality aware processing the whole thing falls into 
its face.


1 Gibps network (current networks should be actually at least 10 Gibps) 
is quite "close" to a single spinning disk throughput. On the other hand 
if target is "seconds" you might want to have a look at some SQL-based 
distributed analytical engines, Flink startup times itself will likely 
add significant overhead on top of the processing time.


On 7/2/24 16:03, Balogh, György wrote:

Hi Jan,
I need to process hundreds of GBs of data within seconds. With local 
data processing I can properly size a hw infrastructure to meet this 
(a couple of years back i did this with hadoop, worked perfectly). If 
there is no locality aware processing the whole thing falls into its 
face.


This comment suggests flink might do this under the hood?

https://stackoverflow.com/questions/38672091/flink-batch-data-local-planning-on-hdfs
Br,
Gyorgy


On Tue, Jul 2, 2024 at 3:08 PM Jan Lukavský  wrote:

Hi Gyorgy,

there is no concept of 'data locality' in Beam that would be
analogous to how MapReduce used to work. The fact that tasks
(compute) are co-located with storage on input is not transferred
to Beam Flink pipelines. The whole concept is kind of ill defined
in terms of Beam model, where tasks can be (at least in theory,
depending on a runner) moved between workers in a distributed
environment. The reason for this is that throughput (and cost) is
dominated mostly by the ability to (uniformly) scale, not the
costs associated with network transfers (this is actually most
visible in the streaming case, where the data is already 'in
motion'). The most common case in Beam is that compute is
completely separated from storage (possible even in the extreme
cases where streaming state is stored outside the compute of
streaming pipeline - but cached locally). The resulting
'stateless' nature of workers generally enables easier and more
flexible scaling.

Having said that, although Beam currently does not (AFAIK) try to
leverage local reads, it _could_ be possible by a reasonable
extension to how splittable DoFn [1] works so that it could make
use of data locality. It would be non-trivial, tough and would
definitely require support from the runner (Flink in this case).

My general suggestion would be to implement a prototype and
measure throughput and part of it possible related to networking
before attempting to dig deeper into how to implement this in Beam
Flink.

Best,

 Jan

[1] https://beam.apache.org/blog/splittable-do-fn/

On 7/2/24 10:46, Balogh, György wrote:

Hi Jan,
Separating live and historic storage makes sense. I need a
historic storage that can ensure data local processing using the
beam - flink stack.
Can I surely achieve this with HDFS? I can colocate hdfs nodes
with flink workers. What exactly enforces that flink nodes will
read local and not remote data?
Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 3:42 PM Jan Lukavský  wrote:

Hi Gyorgy,

comments inline.

On 7/1/24 15:10, Balogh, György wrote:

Hi Jan,
Let me add a few more details to show the full picture. We
have live datastreams (video analysis metadata) and we would
like to run both live and historic pipelines on the metadata
(eg.: live alerts, historic video searches).

This should be fine due to Beam's unified model. You can
write a PTransform that handles PCollection<...> without the
need to worry if the PCollection was created from Kafka or
some bounded source.

We planned to use kafka to store the streaming data and
directly run both types of queries on top. You are
suggesting to consider having kafka with small retention to
server the live queries and store the historic data
somewhere else which scales better for historic queries? We
need to have on prem options here. What options should we
consider that scales nicely (in terms of IO parallelization)
with beam? (eg. hdfs?)


Yes, I would not say necessarily "small" retention, but
probably "limited" retention. Running on premise you can
choose from HDFS or maybe S3 compatible minio or some other
distributed storage, depends on the scale and deployment
options (e.g. YARN or k8s).

I also happen to work on a system which targets exactly these
streaming-batch workloads (persisting upserts from stream to
batch for reprocessing), see [1]. Please feel free to 

Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-02 Thread Jan Lukavský

Hi Gyorgy,

there is no concept of 'data locality' in Beam that would be analogous 
to how MapReduce used to work. The fact that tasks (compute) are 
co-located with storage on input is not transferred to Beam Flink 
pipelines. The whole concept is kind of ill defined in terms of Beam 
model, where tasks can be (at least in theory, depending on a runner) 
moved between workers in a distributed environment. The reason for this 
is that throughput (and cost) is dominated mostly by the ability to 
(uniformly) scale, not the costs associated with network transfers (this 
is actually most visible in the streaming case, where the data is 
already 'in motion'). The most common case in Beam is that compute is 
completely separated from storage (possible even in the extreme cases 
where streaming state is stored outside the compute of streaming 
pipeline - but cached locally). The resulting 'stateless' nature of 
workers generally enables easier and more flexible scaling.


Having said that, although Beam currently does not (AFAIK) try to 
leverage local reads, it _could_ be possible by a reasonable extension 
to how splittable DoFn [1] works so that it could make use of data 
locality. It would be non-trivial, tough and would definitely require 
support from the runner (Flink in this case).


My general suggestion would be to implement a prototype and measure 
throughput and part of it possible related to networking before 
attempting to dig deeper into how to implement this in Beam Flink.


Best,

 Jan

[1] https://beam.apache.org/blog/splittable-do-fn/

On 7/2/24 10:46, Balogh, György wrote:

Hi Jan,
Separating live and historic storage makes sense. I need a historic 
storage that can ensure data local processing using the beam - flink 
stack.
Can I surely achieve this with HDFS? I can colocate hdfs nodes with 
flink workers. What exactly enforces that flink nodes will read local 
and not remote data?

Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 3:42 PM Jan Lukavský  wrote:

Hi Gyorgy,

comments inline.

On 7/1/24 15:10, Balogh, György wrote:

Hi Jan,
Let me add a few more details to show the full picture. We have
live datastreams (video analysis metadata) and we would like to
run both live and historic pipelines on the metadata (eg.: live
alerts, historic video searches).

This should be fine due to Beam's unified model. You can write a
PTransform that handles PCollection<...> without the need to worry
if the PCollection was created from Kafka or some bounded source.

We planned to use kafka to store the streaming data and
directly run both types of queries on top. You are suggesting to
consider having kafka with small retention to server the live
queries and store the historic data somewhere else which scales
better for historic queries? We need to have on prem options
here. What options should we consider that scales nicely (in
terms of IO parallelization) with beam? (eg. hdfs?)


Yes, I would not say necessarily "small" retention, but probably
"limited" retention. Running on premise you can choose from HDFS
or maybe S3 compatible minio or some other distributed storage,
depends on the scale and deployment options (e.g. YARN or k8s).

I also happen to work on a system which targets exactly these
streaming-batch workloads (persisting upserts from stream to batch
for reprocessing), see [1]. Please feel free to contact me
directly if this sounds interesting.

Best,

 Jan

[1] https://github.com/O2-Czech-Republic/proxima-platform


    Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 9:21 AM Jan Lukavský  wrote:

H Gyorgy,

I don't think it is possible to co-locate tasks as you
describe it. Beam has no information about location of
'splits'. On the other hand, if batch throughput is the main
concern, then reading from Kafka might not be the optimal
choice. Although Kafka provides tiered storage for offloading
historical data, it still somewhat limits scalability (and
thus throughput), because the data have to be read by a
broker and only then passed to a consumer. The parallelism is
therefore limited by the number of Kafka partitions and not
parallelism of the Flink job. A more scalable approach could
be to persist data from Kafka to a batch storage (e.g. S3 or
GCS) and reprocess it from there.

Best,

 Jan

On 6/29/24 09:12, Balogh, György wrote:

Hi,
I'm planning a distributed system with multiple kafka
brokers co located with flink workers.
Data processing throughput for historic queries is a main
KPI. So I want to make sure all flink workers read local
data and not remote. I'm defining my pipelines in beam using
java.

Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-01 Thread Jan Lukavský

Hi Gyorgy,

comments inline.

On 7/1/24 15:10, Balogh, György wrote:

Hi Jan,
Let me add a few more details to show the full picture. We have live 
datastreams (video analysis metadata) and we would like to run both 
live and historic pipelines on the metadata (eg.: live alerts, 
historic video searches).
This should be fine due to Beam's unified model. You can write a 
PTransform that handles PCollection<...> without the need to worry if 
the PCollection was created from Kafka or some bounded source.
We planned to use kafka to store the streaming data and directly run 
both types of queries on top. You are suggesting to consider 
having kafka with small retention to server the live queries and store 
the historic data somewhere else which scales better for historic 
queries? We need to have on prem options here. What options should we 
consider that scales nicely (in terms of IO parallelization) with 
beam? (eg. hdfs?)


Yes, I would not say necessarily "small" retention, but probably 
"limited" retention. Running on premise you can choose from HDFS or 
maybe S3 compatible minio or some other distributed storage, depends on 
the scale and deployment options (e.g. YARN or k8s).


I also happen to work on a system which targets exactly these 
streaming-batch workloads (persisting upserts from stream to batch for 
reprocessing), see [1]. Please feel free to contact me directly if this 
sounds interesting.


Best,

 Jan

[1] https://github.com/O2-Czech-Republic/proxima-platform


Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 9:21 AM Jan Lukavský  wrote:

H Gyorgy,

I don't think it is possible to co-locate tasks as you describe
it. Beam has no information about location of 'splits'. On the
other hand, if batch throughput is the main concern, then reading
from Kafka might not be the optimal choice. Although Kafka
provides tiered storage for offloading historical data, it still
somewhat limits scalability (and thus throughput), because the
data have to be read by a broker and only then passed to a
consumer. The parallelism is therefore limited by the number of
Kafka partitions and not parallelism of the Flink job. A more
scalable approach could be to persist data from Kafka to a batch
storage (e.g. S3 or GCS) and reprocess it from there.

Best,

 Jan

On 6/29/24 09:12, Balogh, György wrote:

Hi,
I'm planning a distributed system with multiple kafka brokers co
located with flink workers.
Data processing throughput for historic queries is a main KPI. So
I want to make sure all flink workers read local data and not
remote. I'm defining my pipelines in beam using java.
Is it possible? What are the critical config elements to achieve
this?
Thank you,
Gyorgy

-- 


György Balogh
CTO
E   gyorgy.bal...@ultinous.com <mailto:zsolt.sala...@ultinous.com>
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com <http://www.ultinous.com>




--

György Balogh
CTO
E   gyorgy.bal...@ultinous.com <mailto:zsolt.sala...@ultinous.com>
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com <http://www.ultinous.com>


Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-01 Thread Jan Lukavský

H Gyorgy,

I don't think it is possible to co-locate tasks as you describe it. Beam 
has no information about location of 'splits'. On the other hand, if 
batch throughput is the main concern, then reading from Kafka might not 
be the optimal choice. Although Kafka provides tiered storage for 
offloading historical data, it still somewhat limits scalability (and 
thus throughput), because the data have to be read by a broker and only 
then passed to a consumer. The parallelism is therefore limited by the 
number of Kafka partitions and not parallelism of the Flink job. A more 
scalable approach could be to persist data from Kafka to a batch storage 
(e.g. S3 or GCS) and reprocess it from there.


Best,

 Jan

On 6/29/24 09:12, Balogh, György wrote:

Hi,
I'm planning a distributed system with multiple kafka brokers co 
located with flink workers.
Data processing throughput for historic queries is a main KPI. So I 
want to make sure all flink workers read local data and not remote. 
I'm defining my pipelines in beam using java.

Is it possible? What are the critical config elements to achieve this?
Thank you,
Gyorgy

--

György Balogh
CTO
E   gyorgy.bal...@ultinous.com 
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com 


Re: Exactly once KafkaIO with flink runner

2024-06-24 Thread Jan Lukavský
I don't use Kafka transactions, so I could only speculate. Seems that 
the transaction times out before being committed. Looking into the code, 
this could happen if there is *huge* amount of work between checkpoints 
(i.e. checkpoints do not happen often enough). I'll suggest 
investigating the logs looking for logs coming from the 
KafkaExactlyOnceSink.


 Jan

[1] 
https://github.com/apache/beam/blob/a944bf87cd03d32105d87fc986ecba5b656683bc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L245


On 6/24/24 16:35, Ruben Vargas wrote:

On Mon, Jun 24, 2024 at 2:02 AM Jan Lukavský  wrote:

Hi,

the distribution of keys to workers might not be uniform, when the
number of keys is comparable to total parallelism. General advise would be:

   a) try to increase number of keys (EOS parallelism in this case) to be
at least several times higher than parallelism

Make sense, unfortunately I faced an error when I tried to put the
shards > partitions. :(

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat

Do I need to move any configuration to do that?

Thanks


   b) increase maxParallelism (default 128, maximum 32768), as it might
influence the assignment of keys to downstream workers

Best,

   Jan

On 6/21/24 05:25, Ruben Vargas wrote:

Image as not correctly attached. sending it again. Sorry

Thanks

On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas  wrote:

Hello guys, me again

I was trying to debug the issue with the  backpressure and I noticed
that even if I set the shards = 16, not all tasks are receiving
messages (attaching screenshot). You know potential causes and
solutions?

I really appreciate any help you can provide


Thank you very much!

Regards.


On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas  wrote:

Hello again

Thank you for all the suggestions.

Unfortunately if I put more shards than partitions it throws me this exception

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat
..
..
..
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after
6ms while awaiting AddOffsetsToTxn\n",


Any other alternative? Thank you very much!

Regards

On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:

Hi,

regarding aligned vs unaligned checkpoints I recommend reading [1], it
explains it quite well. Generally, I would prefer unaligned checkpoints
in this case.

Another thing to consider is the number of shards of the EOS sink.
Because how the shards are distributed among workers, it might be good
idea to actually increase that to some number higher than number of
target partitions (e.g. targetPartitions * 10 or so). Additional thing
to consider is increasing maxParallelism of the pipeline (e.g. max value
is 32768), as it also affects how 'evenly' Flink assigns shards to
workers. You can check if the assignment is even using counters in the
sink operator(s).

Jan

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/

On 6/19/24 05:15, Ruben Vargas wrote:

Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

- Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to rec

Re: Exactly once KafkaIO with flink runner

2024-06-24 Thread Jan Lukavský

Hi,

the distribution of keys to workers might not be uniform, when the 
number of keys is comparable to total parallelism. General advise would be:


 a) try to increase number of keys (EOS parallelism in this case) to be 
at least several times higher than parallelism


 b) increase maxParallelism (default 128, maximum 32768), as it might 
influence the assignment of keys to downstream workers


Best,

 Jan

On 6/21/24 05:25, Ruben Vargas wrote:

Image as not correctly attached. sending it again. Sorry

Thanks

On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas  wrote:

Hello guys, me again

I was trying to debug the issue with the  backpressure and I noticed
that even if I set the shards = 16, not all tasks are receiving
messages (attaching screenshot). You know potential causes and
solutions?

I really appreciate any help you can provide


Thank you very much!

Regards.


On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas  wrote:

Hello again

Thank you for all the suggestions.

Unfortunately if I put more shards than partitions it throws me this exception

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat
..
..
..
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after
6ms while awaiting AddOffsetsToTxn\n",


Any other alternative? Thank you very much!

Regards

On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:

Hi,

regarding aligned vs unaligned checkpoints I recommend reading [1], it
explains it quite well. Generally, I would prefer unaligned checkpoints
in this case.

Another thing to consider is the number of shards of the EOS sink.
Because how the shards are distributed among workers, it might be good
idea to actually increase that to some number higher than number of
target partitions (e.g. targetPartitions * 10 or so). Additional thing
to consider is increasing maxParallelism of the pipeline (e.g. max value
is 32768), as it also affects how 'evenly' Flink assigns shards to
workers. You can check if the assignment is even using counters in the
sink operator(s).

   Jan

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/

On 6/19/24 05:15, Ruben Vargas wrote:

Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

   - Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to recover from this error?


Thank you very much for following this up!

On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  wrote:

Hello Jan

Thanks for the suggestions

Any benefit of using aligned vs unaligned?


At the end I found one problem that was preventing  flink from doing
the checkpointing. It was a DoFn function that has some "non
serializable" objects, so I made those transient and initialized those
on the setup.

Weird, because I usually was able to detect these kinds of errors just
running in the direct runner, or even in flink before enabling EOS.


Now I'm facing another weird issue

org.apache.beam.sdk.util.UserCodeException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
expired before the last committed offset for partitions
[behavioral-signals-6] could be determined. Try tuning
default.api.timeout.ms larger to relax the threshold.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)

I tried to extend the timeout and it didn't work, my shards are equal
to my number of partitions.

I appreciate any kind of guidance

Thanks.

On Tu

Re: Exactly once KafkaIO with flink runner

2024-06-19 Thread Jan Lukavský

Hi,

regarding aligned vs unaligned checkpoints I recommend reading [1], it 
explains it quite well. Generally, I would prefer unaligned checkpoints 
in this case.


Another thing to consider is the number of shards of the EOS sink. 
Because how the shards are distributed among workers, it might be good 
idea to actually increase that to some number higher than number of 
target partitions (e.g. targetPartitions * 10 or so). Additional thing 
to consider is increasing maxParallelism of the pipeline (e.g. max value 
is 32768), as it also affects how 'evenly' Flink assigns shards to 
workers. You can check if the assignment is even using counters in the 
sink operator(s).


 Jan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/


On 6/19/24 05:15, Ruben Vargas wrote:

Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

  - Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to recover from this error?


Thank you very much for following this up!

On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  wrote:

Hello Jan

Thanks for the suggestions

Any benefit of using aligned vs unaligned?


At the end I found one problem that was preventing  flink from doing
the checkpointing. It was a DoFn function that has some "non
serializable" objects, so I made those transient and initialized those
on the setup.

Weird, because I usually was able to detect these kinds of errors just
running in the direct runner, or even in flink before enabling EOS.


Now I'm facing another weird issue

org.apache.beam.sdk.util.UserCodeException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
expired before the last committed offset for partitions
[behavioral-signals-6] could be determined. Try tuning
default.api.timeout.ms larger to relax the threshold.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)

I tried to extend the timeout and it didn't work, my shards are equal
to my number of partitions.

I appreciate any kind of guidance

Thanks.

On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský  wrote:

I'd suggest:
  a) use unaligned checkpoints, if possible

  b) verify the number of buckets you use for EOS sink, this limits parallelism 
[1].

Best,

  Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-

On 6/18/24 09:32, Ruben Vargas wrote:

Hello Lukavsky

Thanks for your reply !

I thought was due backpreassure but i increased the resources of the cluster 
and problem still presist. More that that, data stop flowing and the checkpoint 
still fail.

I have configured the checkpoint to do it per minute. The timeout is 1h. Is 
aligned checkpoint.

El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský  
escribió:

H Ruben,

from the provided screenshot it seems to me, that the pipeline in
backpressured by the sink. Can you please share your checkpoint
configuration? Are you using unaligned checkpoints? What is the
checkpointing interval and the volume of data coming in from the source?
With EOS data is committed after checkpoint, before that, the data is
buffered in state, which makes the sink more resource intensive.

   Jan

On 6/18/24 05:30, Ruben Vargas wrote:

Attached a better image of the console.

Thanks!

On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas  wrote:

Hello guys

Wondering if some of you have experiences enabling Exactly Once in
KafkaIO with Flink runner? I enabled it and now I'm facing an issue
where all the checkpoints are failing. I cannot see any exception on
the logs.

Flink console only mentions this "Asynchronous task checkpoint
failed." I also noticed that some operators don't acknowledge the
checkpointing  (Attached a screenshot).

I did this:

1) KafkaIO.Read:

update consumer properties with enable.auto.commit = false
.withReadCommitted()
.commitOffsetsInFinalize()

2) KafkaIO#write:

.withEOS(numShards, sinkGroupId)

But my application 

Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Jan Lukavský

I'd suggest:
 a) use unaligned checkpoints, if possible

 b) verify the number of buckets you use for EOS sink, this limits 
parallelism [1].


Best,

 Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-


On 6/18/24 09:32, Ruben Vargas wrote:

Hello Lukavsky

Thanks for your reply !

I thought was due backpreassure but i increased the resources of the 
cluster and problem still presist. More that that, data stop flowing 
and the checkpoint still fail.


I have configured the checkpoint to do it per minute. The timeout is 
1h. Is aligned checkpoint.


El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
 escribió:


H Ruben,

from the provided screenshot it seems to me, that the pipeline in
backpressured by the sink. Can you please share your checkpoint
configuration? Are you using unaligned checkpoints? What is the
checkpointing interval and the volume of data coming in from the
source?
With EOS data is committed after checkpoint, before that, the data is
buffered in state, which makes the sink more resource intensive.

  Jan

On 6/18/24 05:30, Ruben Vargas wrote:
> Attached a better image of the console.
>
> Thanks!
>
> On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas
 wrote:
>> Hello guys
>>
>> Wondering if some of you have experiences enabling Exactly Once in
>> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
>> where all the checkpoints are failing. I cannot see any
exception on
>> the logs.
>>
>> Flink console only mentions this "Asynchronous task checkpoint
>> failed." I also noticed that some operators don't acknowledge the
>> checkpointing  (Attached a screenshot).
>>
>> I did this:
>>
>> 1) KafkaIO.Read:
>>
>> update consumer properties with enable.auto.commit = false
>> .withReadCommitted()
>> .commitOffsetsInFinalize()
>>
>> 2) KafkaIO#write:
>>
>> .withEOS(numShards, sinkGroupId)
>>
>> But my application is not able to deliver messages to the
output topic
>> due the checkpoint failing.
>> I also reviewed the timeout and other time sensitive
parameters, those
>> are high right now.
>>
>> I really appreciate your guidance on this. Thank you


Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Jan Lukavský

H Ruben,

from the provided screenshot it seems to me, that the pipeline in 
backpressured by the sink. Can you please share your checkpoint 
configuration? Are you using unaligned checkpoints? What is the 
checkpointing interval and the volume of data coming in from the source? 
With EOS data is committed after checkpoint, before that, the data is 
buffered in state, which makes the sink more resource intensive.


 Jan

On 6/18/24 05:30, Ruben Vargas wrote:

Attached a better image of the console.

Thanks!

On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas  wrote:

Hello guys

Wondering if some of you have experiences enabling Exactly Once in
KafkaIO with Flink runner? I enabled it and now I'm facing an issue
where all the checkpoints are failing. I cannot see any exception on
the logs.

Flink console only mentions this "Asynchronous task checkpoint
failed." I also noticed that some operators don't acknowledge the
checkpointing  (Attached a screenshot).

I did this:

1) KafkaIO.Read:

update consumer properties with enable.auto.commit = false
.withReadCommitted()
.commitOffsetsInFinalize()

2) KafkaIO#write:

.withEOS(numShards, sinkGroupId)

But my application is not able to deliver messages to the output topic
due the checkpoint failing.
I also reviewed the timeout and other time sensitive parameters, those
are high right now.

I really appreciate your guidance on this. Thank you


Re: KafkaIO/FixedWindow changes 2.56?

2024-05-20 Thread Jan Lukavský

Does the description in [1] match your case?

[1] https://github.com/apache/beam/issues/31085#issuecomment-2115304242

On 5/19/24 10:07, Yarden BenMoshe wrote:

I am not running my pipeline from command-line, so used instead:
options.setExperiments(Collections.singletonList("use_deprecated_read"));

with ExperimentalOptions added to my options interface, however I dont 
think there's any effect to using it. in terms of the watermark, i 
received again:
WatermarkHold.addHolds: element hold at 2024-05-19T07:52:59.999Z is on 
time for key:aaa-bbb-ccc; 
window:[2024-05-19T07:52:00.000Z..2024-05-19T07:53:00.000Z); 
inputWatermark:-290308-12-21T19:59:05.225Z; 
outputWatermark:-290308-12-21T19:59:05.225Z




‫בתאריך יום ה׳, 16 במאי 2024 ב-17:06 מאת ‪Jan Lukavský‬‏ 
<‪je...@seznam.cz‬‏>:‬


Does using --experiments=use_deprecated_read have any effect?

On 5/16/24 14:30, Yarden BenMoshe wrote:

Hi Jan, my PipelineOptions is as follows:
options.setStreaming(true);
options.setAttachedMode(false);
options.setRunner(FlinkRunner.class);

I've also tried adding:
options.setAutoWatermarkInterval(100L);
as seen in some github issue, without any success so far.

other than that, i am working with parallelism:3 and number of
task slots: 3

Thanks!
Yarden

‫בתאריך יום ה׳, 16 במאי 2024 ב-15:05 מאת ‪Jan Lukavský‬‏
<‪je...@seznam.cz‬‏>:‬

Hi Yarden,

can you please provide all flink-related PipelineOptions you
use for the
job?

  Jan

On 5/16/24 13:44, Yarden BenMoshe wrote:
> Hi all,
> I have a project running with Beam 2.51, using Flink
runner. In one of
> my pipelines i have a FixedWindow and had a problem
upgrading until
> now, with a timers issue now resolved, and hopefully
allowing me to
> upgrade to version 2.56
> However, I encounter another problem now which I believe is
related to
> watermarking(?).
> My pipeline's source is a kafka topic.
> My basic window definition is:
>
> PCollection>>
windowCustomObjectInfo
> = customObject.apply("windowCustomObjectInfo",
>

Window.into(FixedWindows.of(Duration.standardSeconds(60.apply(GroupByKey.create());
>
> and ever since upgrading to version 2.56 I am not getting
any output
> from that window. when enabling TRACE logs, i have this
message:
>
> 2024-05-12 13:50:55,257 TRACE
org.apache.beam.sdk.util.WindowTracing
> [] - WatermarkHold.addHolds: element hold at
2024-05-12T13:50:59.999Z
> is on time for key:test-12345;
> window:[2024-05-12T13:50:00.000Z..2024-05-12T13:51:00.000Z);
> inputWatermark:-290308-12-21T19:59:05.225Z;
> outputWatermark:-290308-12-21T19:59:05.225Z
>
>
> Any hints on where should I look or maybe how I can adjust
my window
> definition? Are you familiar with any change that might be
the cause
> for my issue?
> Thanks


Re: KafkaIO/FixedWindow changes 2.56?

2024-05-16 Thread Jan Lukavský

Does using --experiments=use_deprecated_read have any effect?

On 5/16/24 14:30, Yarden BenMoshe wrote:

Hi Jan, my PipelineOptions is as follows:
options.setStreaming(true);
options.setAttachedMode(false);
options.setRunner(FlinkRunner.class);

I've also tried adding:
options.setAutoWatermarkInterval(100L);
as seen in some github issue, without any success so far.

other than that, i am working with parallelism:3 and number of task 
slots: 3


Thanks!
Yarden

‫בתאריך יום ה׳, 16 במאי 2024 ב-15:05 מאת ‪Jan Lukavský‬‏ 
<‪je...@seznam.cz‬‏>:‬


Hi Yarden,

can you please provide all flink-related PipelineOptions you use
for the
job?

  Jan

On 5/16/24 13:44, Yarden BenMoshe wrote:
> Hi all,
> I have a project running with Beam 2.51, using Flink runner. In
one of
> my pipelines i have a FixedWindow and had a problem upgrading until
> now, with a timers issue now resolved, and hopefully allowing me to
> upgrade to version 2.56
> However, I encounter another problem now which I believe is
related to
> watermarking(?).
> My pipeline's source is a kafka topic.
> My basic window definition is:
>
> PCollection>>
windowCustomObjectInfo
> = customObject.apply("windowCustomObjectInfo",
>

Window.into(FixedWindows.of(Duration.standardSeconds(60.apply(GroupByKey.create());
>
> and ever since upgrading to version 2.56 I am not getting any
output
> from that window. when enabling TRACE logs, i have this message:
>
> 2024-05-12 13:50:55,257 TRACE
org.apache.beam.sdk.util.WindowTracing
> [] - WatermarkHold.addHolds: element hold at
2024-05-12T13:50:59.999Z
> is on time for key:test-12345;
> window:[2024-05-12T13:50:00.000Z..2024-05-12T13:51:00.000Z);
> inputWatermark:-290308-12-21T19:59:05.225Z;
> outputWatermark:-290308-12-21T19:59:05.225Z
>
>
> Any hints on where should I look or maybe how I can adjust my
window
> definition? Are you familiar with any change that might be the
cause
> for my issue?
> Thanks


Re: KafkaIO/FixedWindow changes 2.56?

2024-05-16 Thread Jan Lukavský

Hi Yarden,

can you please provide all flink-related PipelineOptions you use for the 
job?


 Jan

On 5/16/24 13:44, Yarden BenMoshe wrote:

Hi all,
I have a project running with Beam 2.51, using Flink runner. In one of 
my pipelines i have a FixedWindow and had a problem upgrading until 
now, with a timers issue now resolved, and hopefully allowing me to 
upgrade to version 2.56
However, I encounter another problem now which I believe is related to 
watermarking(?).

My pipeline's source is a kafka topic.
My basic window definition is:

PCollection>> windowCustomObjectInfo 
= customObject.apply("windowCustomObjectInfo", 
Window.into(FixedWindows.of(Duration.standardSeconds(60.apply(GroupByKey.create());


and ever since upgrading to version 2.56 I am not getting any output 
from that window. when enabling TRACE logs, i have this message:


2024-05-12 13:50:55,257 TRACE org.apache.beam.sdk.util.WindowTracing 
[] - WatermarkHold.addHolds: element hold at 2024-05-12T13:50:59.999Z 
is on time for key:test-12345; 
window:[2024-05-12T13:50:00.000Z..2024-05-12T13:51:00.000Z); 
inputWatermark:-290308-12-21T19:59:05.225Z; 
outputWatermark:-290308-12-21T19:59:05.225Z



Any hints on where should I look or maybe how I can adjust my window 
definition? Are you familiar with any change that might be the cause 
for my issue?

Thanks


Re: On timer method are not triggred

2024-03-27 Thread Jan Lukavský
KafkaIO is translated using SDF by default. There were some changes how 
watermarks are emitted from Impulse (which is what is the SDF "starting 
point", which might affect what tou see). Please try one of the following:


 a) use --shutdownSourcesAfterIdleMs (e.g. 5000) which should trigger 
watermark from the Impulse, though it might have some other unwanted 
consequences, or


 b) try upgrading to 2.53.0, which had some fixes around that

There were some other fixes around handling IO, so please consider 
upgrade directly to 2.55.0 or at least use --autoWatermarkInterval (e.g. 
100) for your Pipeline.


Hope this helps,

 Jan

On 3/27/24 13:51, Sigalit Eliazov wrote:

hi,
this is the pipeline, very simple one
the onTimer is not fired.
We are not using any experimental variables.

public class KafkaBeamPipeline {

    static class ProcessMessageFn extends DoFn, 
String> {

        @StateId("count")
        private final StateSpec> stateSpec = 
StateSpecs.value();


        @TimerId("eventTimer")
        private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);


        @ProcessElement
        public void processElement(ProcessContext context, 
@StateId("count") ValueState state, @TimerId("eventTimer") 
Timer timer) {

            Integer count = state.read();
            //some logic
            state.write(count);

            // Set a timer for one minute later
            timer.set(context.timestamp().plus(6));
            context.output("Current count: " + count);
        }

        @OnTimer("eventTimer")
        public void onTimer(OnTimerContext context, @StateId("count") 
ValueState state) {

            state.write(0);
            System.out.println("Timer fired at " + context.timestamp());
        }
    }

    public static void main(String[] args) {
        Pipeline pipeline = 
Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());


        pipeline
            .apply(KafkaIO.read()
                    .withBootstrapServers("localhost:9092")
                    .withTopic("topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
                    .withoutMetadata())
            .apply(ParDo.of(new ProcessMessageFn()));

        pipeline.run().waitUntilFinish();
    }
}

thanks
Sigalit

On Wed, Mar 27, 2024 at 9:54 AM Jan Lukavský  wrote:

Hi,

what is your runner, is it Flink as well in the issue? What is the
source of your Pipeline? Do you use some additional flags, e.g.
--experiments? Do you see that using classical or portable runner?

  Jan

On 3/26/24 19:18, Sigalit Eliazov wrote:
> Hi all
> We encountered issue with timers starting from version 2.52.
>
> We saw that the timers are not triggered.
>
> https://github.com/apache/beam/issues/29816
>
> Did someone encounter such problems as well?
>
> Thanks
> Sigalit
>
>
>


Re: On timer method are not triggred

2024-03-27 Thread Jan Lukavský

Hi,

what is your runner, is it Flink as well in the issue? What is the 
source of your Pipeline? Do you use some additional flags, e.g. 
--experiments? Do you see that using classical or portable runner?


 Jan

On 3/26/24 19:18, Sigalit Eliazov wrote:

Hi all
We encountered issue with timers starting from version 2.52.

We saw that the timers are not triggered.

https://github.com/apache/beam/issues/29816

Did someone encounter such problems as well?

Thanks
Sigalit





Re: [QUESTION] confused about event timers.

2024-03-19 Thread Jan Lukavský

Hi,

because you use FileIO I suppose that your pipeline runs in batch mode. 
Event-time timers in both batch and streaming mode depend on a 
_watermark_ [1], which keeps track of the progress in event time. Batch 
pipelines do not have simple ability to work with watermarks (data can 
be arbitrarily delayed), so event-time timers fire for batch pipelines 
only _at the end of the computation_. That is where all windows get 
triggered and where all timers fire. Streaming pipelines have 
information about progress in event time coming from sources, so they 
fire timers in the way one would expect.


Best,

 Jan

[1] https://beam.apache.org/documentation/basics/#watermark

On 3/19/24 15:40, LDesire wrote:

Hello Apache Beam community.

This might be a minor issue, but I'm writing a pipeline and I don't 
understand something, so I wanted to ask a question.


The pipeline looks like this

ReadFromHDFS -> ConvertToDto -> WithTimestamps -> WithKeys -> 
Windowing -> StateDoFn


Windowing used 1 hour FixedWindows.
I also configured the triggering with AfterWatermark, EarlyFiring, and 
LateFiring.


And in StateDoFn, I used an event timer, which I set to 1 minute in 
@ProcessElement.


And we put the elements into a BagState.

So, in OnTimer, shouldn't the BagState contain data from the start of 
the window to 1 minute after the event (i.e. 1 minute after the event 
time)?


I don't understand why it contains all the data within the window.

Here's the pipeline I wrote


pipeline.apply(FileIO.match() .filepattern(options.getPath()) ) 
.apply(FileIO.readMatches()) .apply(FileRead.of()) 
.apply(ToForex.of()) .apply(WithTimestamps.of(Forex::getTimestamp)) .apply(WithKeys.of(Forex::getPid)) .setCoder(KvCoder.of(StringUtf8Coder.of(), ForexCoder.of())) 
.apply( Window.Forex>>into(FixedWindows.of(Duration.standardHours(1L))) .triggering( 
AfterWatermark.pastEndOfWindow() 
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10L))) 
.withLateFirings(AfterPane.elementCountAtLeast(1)) ) 
.withAllowedLateness(Duration.standardMinutes(5L)) 
.accumulatingFiredPanes() ) .apply(ParDo.of(new DoFnForex>, KV>() { @StateId("buffer") private final 
StateSpec<@NonNull BagState> bufferSpec = 
StateSpecs.bag(ForexCoder.of()); @StateId("minTimestamp") private 
final StateSpec<@NonNull CombiningState> 
minTimestampSpec = StateSpecs.combining(Min.ofLongs()); 
@TimerId("timer") private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME); @ProcessElement 
@RequiresTimeSortedInput public void process( @Timestamp Instant 
timestamp, @Element KV element, @StateId("buffer") 
BagState bufferState, @AlwaysFetched @StateId("minTimestamp") 
CombiningState minTimestampState, 
@TimerId("timer") Timer timer ) { final long minTimestamp = 
Math.min(minTimestampState.read(), timestamp.getMillis()); 
timer.withOutputTimestamp(Instant.ofEpochMilli(minTimestamp)) 
.set(Instant.ofEpochMilli(minTimestamp).plus(Duration.standardMinutes(1L))); 
bufferState.add(element.getValue()); } @OnTimer("timer") public void 
onTimer( @Key String key, @AlwaysFetched @StateId("buffer") 
BagState bufferState, @AlwaysFetched @StateId("minTimestamp") 
CombiningState minTimestampState, @Timestamp 
Instant timestamp, OutputReceiver> output, 
@TimerId("timer") Timer timer ) { log.info("[onTimer] at {},", 
timestamp); log.info( "[onTimer] key = {}, minTimestamp = {}, 
maxTimestamp = {}", key, 
Streams.stream(bufferState.read()).min(Comparator.comparing(Forex::getTimestamp)) .map(Forex::getTimestamp).get() , 
Streams.stream(bufferState.read()).max(Comparator.comparing(Forex::getTimestamp)) .map(Forex::getTimestamp).get() ); final double average = 
Streams.stream(bufferState.read()) .mapToDouble(Forex::getLastNumeric) .average() .orElse(.0); output.output(KV.of(key, 
average)); minTimestampState.clear(); bufferState.clear(); 
timer.clear(); } }))



My expectation is that the difference between minTimestamp and 
maxTimestamp in the onTimer log should be at least 1 minute.


But it looks like the BagState contains data from all windows, even 
though the event timer is 1 minute.


Am I misunderstanding the EventTimer?

Thanks.


정보: [onTimer] at 2024-03-18T18:55:29.000Z,
3월 19, 2024 11:27:43 오후 com.icloud.state.EventTimerExample$1 onTimer
정보: [onTimer] at 2024-03-18T20:59:59.000Z,
3월 19, 2024 11:27:43 오후 com.icloud.state.EventTimerExample$1 onTimer
정보: [onTimer] key = 1823, minTimestamp = 2024-03-18T18:01:13.000Z, 
maxTimestamp = 2024-03-18T18:55:29.000Z

3월 19, 2024 11:27:43 오후 com.icloud.state.EventTimerExample$1 onTimer
정보: [onTimer] key = 993168, minTimestamp = 2024-03-19T00:00:28.000Z, 
maxTimestamp = 2024-03-19T00:16:15.000Z

3월 19, 2024 11:27:43 오후 com.icloud.state.EventTimerExample$1 onTimer
정보: [onTimer] at 2024-03-19T02:59:32.000Z,
3월 19, 2024 11:27:43 오후 com.icloud.state.EventTimerExample$1 onTimer
정보: [onTimer] key = 2112, minTimestamp = 2024-03-19T02:01:11.000Z, 
maxTimestamp = 2024-03-19T02:59:32.000Z

3월 19, 2

Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-14 Thread Jan Lukavský
Self-correction, as you are using a streaming pipeline without final 
watermark emission (I suppose), option (a) will not work. Patching the 
sink to support generic windowing would be probably much more involved.


On 3/14/24 14:07, Jan Lukavský wrote:


Hi Ondřej,

I'll start with a disclaimer; I'm not exactly an expert on neither 
python SDK nor ParquetIO, so please take these just as a suggestions 
from the top of my head.


First, it seems that the current implementation of WriteToParquet 
really does not play well with streaming pipelines. There are several 
options that could be used to overcome this limitation:


 a) you can try fixing the sink, maybe adding 
AfterWatermark.pastEndOfWindow() trigger might be enough to make it 
work (need to be tested)


 b) if the Java implementation of ParquetIO works for streaming 
pipelines (and I would suppose it does), you can use cross-language 
transform to run ParquetIO from python, see [1] for quick start


 c) generally speaking, using a full-blown streaming engine for tasks 
like "buffer this and store it in bulk after a timeout" is 
inefficient. Alternative approach would be just to use KafkaConsumer, 
create parquet files on local disk, push them to GCS and commit 
offsets afterwards. Streaming engines buffer data in replicated 
distributed state which adds unneeded complexity


 d) if there is some non-trivial processing between consuming elements 
from Kafka and writing outputs, then it might be an alternative to 
process the data in streaming pipeline, write outputs back to Kafka 
and then use approach (c) to get it to GCS


The specific solution depends on the actual requirements of your 
customers.


Best,

 Jan

[1] 
https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/


On 3/14/24 09:34, Ondřej Pánek wrote:


Basically, this is the error we receive when trying to use avro or 
parquet sinks (attached image).


Also, check the sample pipeline that triggers this error (when 
deploying with DataflowRunner). So obviously, there is no global 
window or default trigger. That’s, I believe, what’s described in the 
issue: https://github.com/apache/beam/issues/25598


*From: *Ondřej Pánek 
*Date: *Thursday, March 14, 2024 at 07:57
*To: *user@beam.apache.org 
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Hello, thanks for the reply!

Please, refer to these:

  * 
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  * https://github.com/apache/beam/issues/25598

Best,

Ondrej

*From: *XQ Hu via user 
*Date: *Thursday, March 14, 2024 at 02:32
*To: *user@beam.apache.org 
*Cc: *XQ Hu 
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Can you explain more about " that current sinks for Avro and Parquet 
with the destination of GCS are not supported"?


We do have AvroIO and ParquetIO 
(https://beam.apache.org/documentation/io/connectors/) in Python.


On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek  
wrote:


Hello Beam team!

We’re currently onboarding customer’s infrastructure to the
Google Cloud Platform. The decision was made that one of the
technologies they will use is Dataflow. Let me briefly the
usecase specification:

They have kafka cluster where data from CDC data source is
stored. The data in the topics is stored as Avro format. Their
other requirement is they want to have a streaming solution
reading from these Kafka topics, and writing to the Google Cloud
Storage again in Avro. What’s more, the component should be
written in Python, since their Data Engineers heavily prefer
Python instead of Java.

We’ve been struggling with the design of the solution for couple
of weeks now, and we’re facing quite unfortunate situation now,
not really finding any solution that would fit these requirements.

So the question is: Is there any existing Dataflow
template/solution with the following specifications:

  * Streaming connector
  * Written in Python
  * Consumes from Kafka topics
  * Reads Avro with Schema Registry
  * Writes Avro to GCS

We found out, that current sinks for Avro and Parquet with the
destination of GCS are not supported for Python at the moment,
which is basically the main blocker now.

Any recommendations/suggestions would be really highly appreciated!

Maybe the solution really does not exist and we need to create
our own custom connector for it. The question in this case would
be if that’s even possible theoretically, since we would really
need to avoid another dead end.

Thanks a lot for any help!

Kind regards,

Ondrej


Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-14 Thread Jan Lukavský

Hi Ondřej,

I'll start with a disclaimer; I'm not exactly an expert on neither 
python SDK nor ParquetIO, so please take these just as a suggestions 
from the top of my head.


First, it seems that the current implementation of WriteToParquet really 
does not play well with streaming pipelines. There are several options 
that could be used to overcome this limitation:


 a) you can try fixing the sink, maybe adding 
AfterWatermark.pastEndOfWindow() trigger might be enough to make it work 
(need to be tested)


 b) if the Java implementation of ParquetIO works for streaming 
pipelines (and I would suppose it does), you can use cross-language 
transform to run ParquetIO from python, see [1] for quick start


 c) generally speaking, using a full-blown streaming engine for tasks 
like "buffer this and store it in bulk after a timeout" is inefficient. 
Alternative approach would be just to use KafkaConsumer, create parquet 
files on local disk, push them to GCS and commit offsets afterwards. 
Streaming engines buffer data in replicated distributed state which adds 
unneeded complexity


 d) if there is some non-trivial processing between consuming elements 
from Kafka and writing outputs, then it might be an alternative to 
process the data in streaming pipeline, write outputs back to Kafka and 
then use approach (c) to get it to GCS


The specific solution depends on the actual requirements of your customers.

Best,

 Jan

[1] 
https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/


On 3/14/24 09:34, Ondřej Pánek wrote:


Basically, this is the error we receive when trying to use avro or 
parquet sinks (attached image).


Also, check the sample pipeline that triggers this error (when 
deploying with DataflowRunner). So obviously, there is no global 
window or default trigger. That’s, I believe, what’s described in the 
issue: https://github.com/apache/beam/issues/25598


*From: *Ondřej Pánek 
*Date: *Thursday, March 14, 2024 at 07:57
*To: *user@beam.apache.org 
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Hello, thanks for the reply!

Please, refer to these:

  * 
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  * https://github.com/apache/beam/issues/25598

Best,

Ondrej

*From: *XQ Hu via user 
*Date: *Thursday, March 14, 2024 at 02:32
*To: *user@beam.apache.org 
*Cc: *XQ Hu 
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Can you explain more about " that current sinks for Avro and Parquet 
with the destination of GCS are not supported"?


We do have AvroIO and ParquetIO 
(https://beam.apache.org/documentation/io/connectors/) in Python.


On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek  
wrote:


Hello Beam team!

We’re currently onboarding customer’s infrastructure to the Google
Cloud Platform. The decision was made that one of the technologies
they will use is Dataflow. Let me briefly the usecase specification:

They have kafka cluster where data from CDC data source is stored.
The data in the topics is stored as Avro format. Their other
requirement is they want to have a streaming solution reading from
these Kafka topics, and writing to the Google Cloud Storage again
in Avro. What’s more, the component should be written in Python,
since their Data Engineers heavily prefer Python instead of Java.

We’ve been struggling with the design of the solution for couple
of weeks now, and we’re facing quite unfortunate situation now,
not really finding any solution that would fit these requirements.

So the question is: Is there any existing Dataflow
template/solution with the following specifications:

  * Streaming connector
  * Written in Python
  * Consumes from Kafka topics
  * Reads Avro with Schema Registry
  * Writes Avro to GCS

We found out, that current sinks for Avro and Parquet with the
destination of GCS are not supported for Python at the moment,
which is basically the main blocker now.

Any recommendations/suggestions would be really highly appreciated!

Maybe the solution really does not exist and we need to create our
own custom connector for it. The question in this case would be if
that’s even possible theoretically, since we would really need to
avoid another dead end.

Thanks a lot for any help!

Kind regards,

Ondrej


Re: Beam portable runner setup for Flink + Python on Kubernetes

2024-02-23 Thread Jan Lukavský

Hi,

I have set up such configuration for local environment (minikube), that 
can be found at [1] and [2]. It is somewhat older, but it might serve as 
an inspiration. If you would like write up your solution to the 
documentation, that would be awesome, I'd be happy to review it. :)


Best,
 Jan

[1] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/manifests/flink.yaml


[2] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile


On 2/23/24 00:48, Jaehyeon Kim wrote:

Hello,

I'm playing with the beam portable runner to read/write data from 
Kafka. I see a spark runner example on Kubernetes 
(https://beam.apache.org/documentation/runners/spark/#kubernetes) but 
the flink runner section doesn't include such an example.


Is there a resource that I can learn? Ideally it'll be good if it is 
updated in the documentation.


Cheers,
Jaehyeon


Re: Can apache beam be used for control flow (ETL workflow)

2023-12-15 Thread Jan Lukavský

Hi,

Apache Beam describes itself as "Apache Beam is an open-source, unified 
programming model for batch and streaming data processing pipelines, 
...". As such, it is possible to use it to express essentially arbitrary 
logic and run it as a streaming pipeline. A streaming pipeline processes 
input data and produces output data and/or actions. Given these 
assumptions, it is technically feasible to use Apache Beam for 
orchestrating other workflows, the problem is that it will very much 
likely not be efficient. Apache Beam has a lot of heavy-lifting related 
to the fact it is designed to process large volumes of data in a 
scalable way, which is probably not what would one need for workflow 
orchestration. So, my two cents would be, that although it _could_ be 
done, it probably _should not_ be done.


Best,

 Jan

On 12/15/23 13:39, Mikhail Khludnev wrote:

Hello,
I think this page 
https://beam.apache.org/documentation/ml/orchestration/ might answer 
your question.

Frankly speaking: GCP Workflows and Apache Airflow.
But Beam itself is a data-stream/flow or batch processor; not a 
workflow engine (IMHO).


On Fri, Dec 15, 2023 at 3:13 PM data_nerd_666  
wrote:


I know it is technically possible, but my case may be a little
special. Say I have 3 steps for my control flow (ETL workflow):
Step 1. upstream file watching
Step 2. call some external service to run one job, e.g. run a
notebook, run a python script
Step 3. notify downstream workflow
Can I use apache beam to build a DAG with 3 nodes and run this as
either flink or spark job.  It might be a little weird, but I just
want to learn from the community whether this is the right way to
use apache beam, and has anyone done this before? Thanks



On Fri, Dec 15, 2023 at 10:28 AM Byron Ellis via user
 wrote:

It’s technically possible but the closest thing I can think of
would be triggering things based on things like file watching.

On Thu, Dec 14, 2023 at 2:46 PM data_nerd_666
 wrote:

Not using beam as time-based scheduler, but just use it to
control execution orders of ETL workflow DAG, because
beam's abstraction is also a DAG.
I know it is a little weird, just want to confirm with the
community, has anyone used beam like this before?



On Thu, Dec 14, 2023 at 10:59 PM Jan Lukavský
 wrote:

Hi,

can you give an example of what you mean for better
understanding? Do
you mean using Beam as a scheduler of other ETL workflows?

  Jan

On 12/14/23 13:17, data_nerd_666 wrote:
> Hi all,
>
> I am new to apache beam, and am very excited to find
beam in apache
> community. I see lots of use cases of using apache
beam for data flow
> (process large amount of batch/streaming data). I am
just wondering
> whether I can use apache beam for control flow (ETL
workflow). I don't
> mean the spark/flink job in the ETL workflow, I mean
the ETL workflow
> itself. Because ETL workflow is also a DAG which is
very similar as
> the abstraction of apache beam, but unfortunately I
didn't find such
> use cases on internet. So I'd like to ask this
question in beam
> community to confirm whether I can use apache beam
for control flow
> (ETL workflow). If yes, please let me know some
success stories of
> this. Thanks
>
>
>



--
Sincerely yours
Mikhail Khludnev

Re: Can apache beam be used for control flow (ETL workflow)

2023-12-14 Thread Jan Lukavský

Hi,

can you give an example of what you mean for better understanding? Do 
you mean using Beam as a scheduler of other ETL workflows?


 Jan

On 12/14/23 13:17, data_nerd_666 wrote:

Hi all,

I am new to apache beam, and am very excited to find beam in apache 
community. I see lots of use cases of using apache beam for data flow 
(process large amount of batch/streaming data). I am just wondering 
whether I can use apache beam for control flow (ETL workflow). I don't 
mean the spark/flink job in the ETL workflow, I mean the ETL workflow 
itself. Because ETL workflow is also a DAG which is very similar as 
the abstraction of apache beam, but unfortunately I didn't find such 
use cases on internet. So I'd like to ask this question in beam 
community to confirm whether I can use apache beam for control flow 
(ETL workflow). If yes, please let me know some success stories of 
this. Thanks






Re: Pipeline Stalls at GroupByKey Step

2023-11-20 Thread Jan Lukavský

Hi Sigalit,

you should set TimestampPolicyFactory [1] to the source, because 
otherwise resetting the timestamp in a plain ParDo 
(ConvertFromKafkaRecord) can cause the element's timestamp to shift back 
in time before watermark and subsequently cause the data to get dropped 
by the GroupByKey transform. If you don't set any watermark policy 
explicitly, the default is processing time, which is likely causing the 
effect you observe. Alternative option is to use state and timers in the 
ConvertFromKafkaRecord transform to make sure that the transform 
correctly holds the output watermark using Timer.withOutputTimestamp 
[2]. I'd go with option 1) though.


Best,

 Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-
[2] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/state/Timer.html#withOutputTimestamp-org.joda.time.Instant-


On 11/17/23 08:22, Sigalit Eliazov wrote:

yes, the output of ConvertFromKafkaRecord is with timestamp
KafkaRecord record = c.element();
KV entry =Objects.requireNonNull(record).getKV();
String key = convertKey(entry.getKey());
T value = convertValue(entry.getValue());
c.outputWithTimestamp(KV.of(key,value),Instant.now()
thanks
Sigalit

On Fri, Nov 17, 2023 at 4:36 AM Sachin Mittal  wrote:

Do you add time stamp to every record you output in
ConvertFromKafkaRecord step or any step before that.

On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov
 wrote:

Hi,

In our pipeline, we've encountered an issue with the
|GroupByKey| step. After some time of running, it seems that
messages are not progressing through the |GroupByKey| step,
causing the pipeline to stall in data processing.

To troubleshoot this issue, we added debug logging before and
after the |GroupByKey|step.

We are using Beam version 2.50 with Flink 1.16.

running with only 1 task manager, 2 slots. parallelism 2. no HA.

any insights or suggestions?

The messages are KV - of String and an Avro message.

PCollection> ipSessionInput =
pipeline
    .apply("readIpSessionInfo",
KafkaTransform.readAvroMessageFromKafka(
        pipelineUtil.getBootstrapServers(),
        options.getSourceKafkaTopic(),
        PIPELINE_NAME,
        IpSessionInfo.class,
        IpSessionInfoDeserializer.class))
 .apply("ConvertIpSessionFromKafka", ParDo.of(new
ConvertFromKafkaRecord<>()))
 .apply(Window.>into(new
GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes())
.apply("DebugLogBeforeGroupByKey", ParDo.of(new
DoFn, KV>() {
        @DoFn.ProcessElement
        public void processElement(ProcessContext c) {
            KV element = c.element();
            log.atInfo().log("Before GroupByKey: " + element);
            c.output(element);
        }
    }))
*    .apply(GroupByKey.create())
*
 .apply("DebugLogAfterGroupByKey", ParDo.of(new
DoFn>, KV>>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            KV> groupedElement
= c.element();
            log.atInfo().log("After GroupByKey: " +
groupedElement);
            c.output(groupedElement);
        }
    }))
    .apply(ParDo.of(new ConvertIpSession()));


thanks
Sigalit


Re: [DISCUSS] Drop Euphoria extension

2023-10-16 Thread Jan Lukavský
Sure, that would be probably the preferred way to go. For now, I'm 
trying to get some feedback, if there are some real-world users who 
might miss the API. Currently, the only value I see is that Euphoria 
adds an additional level of indirection for user code. The expansion 
goes like this:


 Euphoria Pipeline -> runtime provided translators -> vanilla Beam 
Pipeline -> runner


Hence code written using Euphoria extension can be modified at runtime 
(Pipeline construction time) using dependency injection, which brings 
the value that users can modify (typically optimize) Pipelines without 
actually modifying the business logic. On the other hand I'm not sure if 
this justifies the complexity of the extension. Were this the only 
value, it should be possible to implement such dynamic expansion either 
into Java SDK core or as a different light-weight extension.


 Jan

On 10/16/23 15:10, Alexey Romanenko wrote:

Can we just deprecate it for a while and then remove completely?

—
Alexey


On 13 Oct 2023, at 18:59, Jan Lukavský  wrote:

Hi,

it has been some time since Euphoria extension [1] has been adopted by Beam as a possible 
"Java 8 API". Beam has evolved from that time a lot, the current API seems 
actually more elegant than the original Euphoria's and last but not least, it has no 
maintainers and no known users. If there are any users, please speak up!

Otherwise I'd like to propose to drop it from codebase, I'll start a vote 
thread during next week, if there are no objections.

Best,

  Jan

[1] https://beam.apache.org/documentation/sdks/java/euphoria/



[DISCUSS] Drop Euphoria extension

2023-10-13 Thread Jan Lukavský

Hi,

it has been some time since Euphoria extension [1] has been adopted by 
Beam as a possible "Java 8 API". Beam has evolved from that time a lot, 
the current API seems actually more elegant than the original Euphoria's 
and last but not least, it has no maintainers and no known users. If 
there are any users, please speak up!


Otherwise I'd like to propose to drop it from codebase, I'll start a 
vote thread during next week, if there are no objections.


Best,

 Jan

[1] https://beam.apache.org/documentation/sdks/java/euphoria/



Re: simplest way to do exponential moving average?

2023-10-02 Thread Jan Lukavský

Hi,

this depends on how exactly you plan to calculate the average. The 
original definition is based on exponentially decreasing weight of more 
distant (older if time is on the x-axis) data points. This (technically) 
means that this average at any point X1 depends on all values X0 <= X1. 
This would therefore require buffering (using GroupByKey) all elements 
in global window, doing the sorting manually and then computing the new 
value of the average triggering after each element. This is probably the 
technically correct, but most computationally intensive variant.


If the average is done over time intervals, then an other option could 
be to define a cut-off interval T, i.e. set the exponentially vanishing 
weight of value of data points to be zero at some T0 < T1 - T. If the 
data points come at some discrete time-intervals (minutes, hours, days), 
then this could mean you can split the data into time sliding windows 
(window interval being the cut-off interval, and slide the update 
interval) and assign weight for each data point in the particular time 
interval - i.e. how much weight does the data point have at the time of 
end of the sliding window. With this you could then using CombineFn to 
count and sum the weighted averages, which would be much more efficient.


Best,

 Jan

On 9/30/23 17:08, Balogh, György wrote:

Hi,
I want to calculate the exponential moving average of a signal using 
beam in java.
I understand there is no time order guarantee on incoming data. What 
would be the simplest solution for this?

Thank you,

--

György Balogh
CTO
E   gyorgy.bal...@ultinous.com 
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com 


Re: "Decorator" pattern for PTramsforms

2023-09-19 Thread Jan Lukavský
This applies to sources and cannot be used inside the Pipeline. 
Essentially, what would be needed is a support for a back edge from a 
PTransform to source(s) to notify the source PTransform(s) to terminate. 
This is also essential for any possible support for iterations (which 
would probably be implemented as a "decorator" around a PTransform, the 
"limit" is essentially only a special case of a single iteration). I 
don't want to discuss the details specifically here, but if we would 
want to add a support for decorators, we might have in mind that there 
is something missing in the model for full support.


We can open a different discussion thread if there is any interest.

Best,

 Jan

On 9/18/23 17:19, Alexey Romanenko wrote:
In the past it was BoundedReadFromUnboundedSource that is still, iinm, 
used in KafkaIO to limit read by number of records or time. Though, in 
the same time we had a discussion that it should not be used anymore 
and considered as obsolete transform.



On 18 Sep 2023, at 09:28, Jan Lukavský  wrote:

Do we have a defined way for a PTransform to create bounded 
PCollection from an unbounded one (a typical example would be LIMIT 
acting on unbounded input)? AFAIK, we can use SDF to manipulate 
watermark, but that requires terminating the Pipeline even though 
there are still upstream running transforms (e.g. sources). I'm not 
sure if we have a sound definition of when a runner should terminate 
a Pipeline, so I guess this is runner dependent, right? If I'm not 
wrong, for example Flink does not terminate Pipeline until there is 
at least one running operator, so this might require signalling 
sources from sink (thus introducing some form of cycle).


 Jan

On 9/15/23 18:55, Robert Bradshaw via user wrote:
On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user 
 wrote:


Creating composite DoFns is tricky today due to how they are
implemented (via annotated methods).


Note that this depends on the language. This should be really easy 
to do from Python.


However providing such a method to compose DoFns would be very
useful IMO.


+1

On Fri, Sep 15, 2023 at 9:33 AM Joey Tran
 wrote:

Yeah for (1) the concern would be adding a shuffle/fusion
break and (2) sounds like the likely solution, was just
hoping there'd be one that could wrap at the PTransform
level but I realize now the PTransform abstraction is too
general as you mentioned to do something like that.

(2) will be likely what we do, though now I'm wondering if
it might be possible to create a ParDo wrapper that can take
a ParDo, extract it's dofn, wrap it, and return a new ParDo

On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user
 wrote:

+1 to looking at composite transforms. You could even
have a composite transform that takes another transform
as one of its construction arguments and whose expand
method does pre- and post-processing to the
inputs/outputs before/after applying the transform in
question. (You could even implement this as a Python
decorator if you really wanted, either decorating the
expand method itself or the full class...)

One of the difficulties is that for a general transform
there isn't necessarily a 1:N relationship between
outputs and inputs as one has for a DoFn (especially if
there is any aggregation involved). There are, however,
two partial solutions that might help.

(1) You can do a CombineGlobally with a CombineFn (Like
Sample) that returns at most N elements. You could do
this with a CombinePerKey if you can come up with a
reasonable key (e.g. the id of your input elements) that
the limit should be a applied to. Note that this may
cause a lot of data to be shuffled (though due to
combiner lifting, no more than N per bundle).

(2) You could have a DoFn that limits to N per bundle by
initializing a counter in its start_bundle and passing
elements through until the counter reaches a threshold.
(Again, one could do this per id if one is available.)
It wouldn't stop production of the elements, but if
things get fused it would still likely be fairly cheap.

Both of these could be prepended to the problematic
consuming PTransform as well.

- Robert



On Fri, Sep 15, 2023 at 8:13 AM Joey Tran
 wrote:

I'm aware of composite transforms and of the
distributed nature of PTransforms. I'm not
suggesting limiting the entire set and my example
was more illustrative than the actual use c

Re: "Decorator" pattern for PTramsforms

2023-09-18 Thread Jan Lukavský
Do we have a defined way for a PTransform to create bounded PCollection 
from an unbounded one (a typical example would be LIMIT acting on 
unbounded input)? AFAIK, we can use SDF to manipulate watermark, but 
that requires terminating the Pipeline even though there are still 
upstream running transforms (e.g. sources). I'm not sure if we have a 
sound definition of when a runner should terminate a Pipeline, so I 
guess this is runner dependent, right? If I'm not wrong, for example 
Flink does not terminate Pipeline until there is at least one running 
operator, so this might require signalling sources from sink (thus 
introducing some form of cycle).


 Jan

On 9/15/23 18:55, Robert Bradshaw via user wrote:
On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user 
 wrote:


Creating composite DoFns is tricky today due to how they are
implemented (via annotated methods).


Note that this depends on the language. This should be really easy to 
do from Python.


However providing such a method to compose DoFns would be very
useful IMO.


+1

On Fri, Sep 15, 2023 at 9:33 AM Joey Tran
 wrote:

Yeah for (1) the concern would be adding a shuffle/fusion
break and (2) sounds like the likely solution, was just hoping
there'd be one that could wrap at the PTransform level but I
realize now the PTransform abstraction is too general as you
mentioned to do something like that.

(2) will be likely what we do, though now I'm wondering if it
might be possible to create a ParDo wrapper that can take a
ParDo, extract it's dofn, wrap it, and return a new ParDo

On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user
 wrote:

+1 to looking at composite transforms. You could even have
a composite transform that takes another transform as one
of its construction arguments and whose expand method does
pre- and post-processing to the inputs/outputs
before/after applying the transform in question. (You
could even implement this as a Python decorator if you
really wanted, either decorating the expand method itself
or the full class...)

One of the difficulties is that for a general transform
there isn't necessarily a 1:N relationship between outputs
and inputs as one has for a DoFn (especially if there is
any aggregation involved). There are, however, two partial
solutions that might help.

(1) You can do a CombineGlobally with a CombineFn (Like
Sample) that returns at most N elements. You could do this
with a CombinePerKey if you can come up with a reasonable
key (e.g. the id of your input elements) that the limit
should be a applied to. Note that this may cause a lot of
data to be shuffled (though due to combiner lifting, no
more than N per bundle).

(2) You could have a DoFn that limits to N per bundle by
initializing a counter in its start_bundle and passing
elements through until the counter reaches a threshold.
(Again, one could do this per id if one is available.) It
wouldn't stop production of the elements, but if things
get fused it would still likely be fairly cheap.

Both of these could be prepended to the problematic
consuming PTransform as well.

- Robert



On Fri, Sep 15, 2023 at 8:13 AM Joey Tran
 wrote:

I'm aware of composite transforms and of the
distributed nature of PTransforms. I'm not suggesting
limiting the entire set and my example was more
illustrative than the actual use case.

My actual use case is basically: I have multiple
PTransforms, and let's say most of them average ~100
generated outputs for a single input. Most of these
PTransforms will occasionally run into an input though
that might output maybe 1M outputs. This can cause
issues if for example there are transforms that follow
it that require a lot of compute per input.

The simplest way to deal with this is to modify the
`DoFn`s in our Ptransforms and add a limiter in the
logic (e.g. `if num_outputs_generated >=
OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate
this logic across our transforms, but it'd be much
cleaner if we could lift up this limiting logic out of
the application logic and have some generic wrapper
that extends our transforms.

Thanks for the discussion!

On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko
 wrote:

  

Re: Seeking Assistance to Resolve Issues/bug with Flink Runner on Kubernetes

2023-08-15 Thread Jan Lukavský

Hi Kapil,

if you don't have a special reason for running the jobserver manually, 
you can let Beam Python SDK to run it for you (and let it configure 
accordingly). You just need to pass `--runner=flink` (and 
--flink_master) to your flink_options (or via command-line). As Sam 
suggested, it would be good to try to run minimal pipeline without Kafka 
(which brings the complexity of cross-language pipeline) to see if the 
problem is with the jobserver or the expansion service.


I also have a working repo with k8s (minikube), flink and kafka - 
dockerfile: [1] (somewhat older versions), deployment: [2], example 
pipeline: [3].


Best,

 Jan


[1] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile


[2] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/manifests/flink.yaml


[3] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/max_word_length.py


On 8/14/23 22:12, Sam Bourne wrote:

Hey Kapil,

I grappled with a similar deployment and created this repo 
 [1] to attempt to provide 
others with some nuggets of useful information. We were running cross 
language pipelines on flink connecting PubsubIO 
 [2] 
to other misc python transforms. No promises it will help, but feel 
free to take a look as it's a close approximation to the setup that we 
had working.


Your particular error seems related to the Kafka transform. Does a 
pure python pipeline execute as expected?


[1] https://github.com/sambvfx/beam-flink-k8s
[2] 
https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171



On Mon, Aug 14, 2023 at 11:05 AM Daniel Chen via user 
 wrote:


Not the OP, but is it possible to join the slack channel without
an apache.org  email address? I tried joining
slack previously for support and gave up because it looked like it
wasn't.

On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles 
wrote:

There is a slack channel linked from
https://beam.apache.org/community/contact-us/ it is #beam on
the-asf.slack.com 

(you find this via beam.apache.org  ->
Community -> Contact Us)

It sounds like an issue with running a multi-language pipeline
on the portable flink runner. (something which I am not
equipped to help with in detail)

Kenn

On Wed, Aug 9, 2023 at 2:51 PM kapil singh
 wrote:

Hey,

I've been grappling with this issue for the past five days
and, despite my continuous efforts, I haven't found a
resolution. Additionally, I've been unable to locate a
Slack channel for Beam where I might seek assistance.

issue

*RuntimeError: Pipeline construction environment and
pipeline runtime environment are not compatible. If you
use a custom container image, check that the Python
interpreter minor version and the Apache Beam version in
your image match the versions used at pipeline
construction time. Submission environment:
beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
Runtime environment:
beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*



Here what i am trying to do

 i am running job from kubernetes container that hits on
job server and then job manager and task manager
task manager and job manager is one Container

Here is  My custom Dockerfile. name:custom-flink

# Starting with the base Flink image
FROM apache/flink:1.16-java11
ARG FLINK_VERSION=1.16
ARG KAFKA_VERSION=2.8.0

# Install python3.8 and its associated dependencies,
followed by pyflink
RUN set -ex; \
apt-get update && \
apt-get install -y build-essential libssl-dev zlib1g-dev
libbz2-dev libffi-dev lzma liblzma-dev && \
wget
https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
tar -xvf Python-3.8.0.tgz && \
cd Python-3.8.0 && \
./configure --without-tests --enable-shared && \
make -j4 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
apt-get clean && 

Re: Count Incoming Unbounded records using Apache Beam java

2023-07-14 Thread Jan Lukavský

Hi,
thanks for your interest in Apache Beam. I answered your question, see [1]

Best,
 Jan

[1] 
https://stackoverflow.com/questions/76681800/count-incoming-unbouned-messages-from-pubsub-using-apache-beam-java/76685799#76685799


On 7/13/23 19:17, Phani Geeth wrote:


Hi,

I am trying to count the incoming messages from PubSub using beam 
pipeline java sdk.


class||Display||extends||DoFn||
|{|
@ProcessElement||
public||void||process(ProcessContext c)||
|{|
|System.out.println(c.element());|
|}|
|}|
public||class||CustomMetrics||
|{|
public||static||void||main(String args[])||
|{|
PipelineOptions||options||=|PipelineFactory.create();|
|Pipleine pipeline= Pipeline.create(options);|
||
|pipeline.apply(PubsubIO.readStrings().fromSubscription(|"PubSub 
Subscription"|))|
|.apply(Window.into(FixedWindows.of(Duration.standardMinutes(|5|))).triggering(DefaultTrigger.of()))|
|.apply(Combine.globally(Count.combineFn()).withoutDefaults())|
|.apply(ParDo.of(|new||Display|()));|
||
|pipeline.run().waitUntilFinish();|
|}|
|}|
||

Able to count the incoming records but the number is not displayed 
immediatley its populated randomly after some time.


Is there any other way to count the incoming unbounded records using 
Apache Beam pipeline.


https://stackoverflow.com/questions/76681800/count-incoming-unbouned-messages-from-pubsub-using-apache-beam-java

Thanks,

Phani Geeth

Sent from Mail  for 
Windows


Re: [Question] check if pipeline is still running in pipeline runner

2023-07-10 Thread Jan Lukavský

Hi,

when JM goes down, it should be brought up (if configured as HA, running 
on k8s, ...), and it should recover all running jobs. If this does not 
happen then it means that:


 a) either the JM is not in HA configuration, or

 b) it is unable to recover after failure, which typically means that 
there is some problem reading jobs metadata from external storage, or 
some other persistent error


Either way, monitoring the job for simple presence in the JM might not 
be sufficient. What I personally prefer is monitoring the lag of job's 
output watermark behind current processing time. It is possible to do 
this on Beam's application level, by using a looping event-time timer, 
that outputs the current input watermark each T seconds. This is then 
able to capture even the case, when the job is running, but is unable to 
do any progress.


Best,

 Jan

On 7/7/23 23:34, Adlae D'Orazio wrote:

Hi Jan,

Thank you for your response! Apologies that this wasn't clear, but 
we're actually looking at what would//happen if the job server /were 
/to go down. So what we are more interested in is understanding /how/ 
to actually monitor that the job is running. We won't know the job id 
so we can't use that to query the REST API. Like I said, we were 
looking into that method that initializes an AppName, but that was 
written in Java. What do you think we should do? Thank you so much for 
your help!


Best,

Adlae D'Orazio

On Fri, Jul 7, 2023 at 1:28 AM Jan Lukavský  wrote:

Hi,

if I understand correctly, you have a 'program runner' (sometimes
called a driver), which is supposed to be long-running and
watching if the submitted Pipeline runs or not. If not, then the
driver resubmits the job. If my understanding is correct, I would
suggest looking into the reasons why the pipeline terminates in
the first place. Flink is designed to ensure that after job
submission it is fault-tolerant for both application-level errors
(e.g. transient user code errors, external dependencies failures,
etc) and the Flink runtime itself (failures of taskmanagers or
jobmanager). The most often case when this does not work is some
sort of misconfiguration (typically inability to restore jobs
after failure of jobmanager). Having said that it is good idea to
_monitor_ that your job runs (and ideally that it makes progress,
because the pure fact that job 'runs' does not imply that), but it
should require manual action in case the job is permanently gone.
Simple resubmission of the job is not what I would expect to work
well.

Best,

 Jan

On 7/6/23 22:07, Adlae D'Orazio wrote:


Hello,


I am using an Apache Flink cluster to run a streaming pipeline
that I've created using Apache Beam. This streaming pipeline
should be the only one of its type running on the Flink cluster,
and I need some help with how to ensure that is the case.


A Dockerized pipeline runner program submits the streaming
pipeline, and if the pipeline exits (i.e. because of an error),
then the pipeline runner program exits and is re-run, so that the
pipeline is submitted again and continues running.


The problem I am running into is that if the pipeline runner
program exits, but the streaming pipeline is still running (i.e.
because the job server went down and came back up), then I need
to check in the pipeline runner program whether or not the
pipeline is still running, or if it has gone down.


My first thought was to try to create a specific job name that
would be stored in Flink's REST API, and then to see if the job
was already running, I could query the REST API for that name.
I'm having trouble doing this. I seem to be able to set a job
name in Beam, but that job name does not seem to be accessible
via Flink’s REST API once the pipeline is run using Flink. From
researching this problem, I found this

<https://github.com/apache/beam/blob/9a11e28ce79e3b243a13fbf148f2ba26b8c14107/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java#L340>method,
which initializes an AppName. This seems promising to me, but it
is written in Java and I am looking to do it in Python.


Is there a way to specify the Flink job name via the Beam Python
SDK? Or is there a simpler way to know that a particular Beam
pipeline is running, and therefore not resubmit it?


Please let me know if you have any suggestions - either about how
to execute the approaches I've described or if there's a simpler
solution that I am overlooking. Thank you for your help!


Best,

Adlae D'Orazio



Re: [Question] check if pipeline is still running in pipeline runner

2023-07-07 Thread Jan Lukavský

Hi,

if I understand correctly, you have a 'program runner' (sometimes called 
a driver), which is supposed to be long-running and watching if the 
submitted Pipeline runs or not. If not, then the driver resubmits the 
job. If my understanding is correct, I would suggest looking into the 
reasons why the pipeline terminates in the first place. Flink is 
designed to ensure that after job submission it is fault-tolerant for 
both application-level errors (e.g. transient user code errors, external 
dependencies failures, etc) and the Flink runtime itself (failures of 
taskmanagers or jobmanager). The most often case when this does not work 
is some sort of misconfiguration (typically inability to restore jobs 
after failure of jobmanager).  Having said that it is good idea to 
_monitor_ that your job runs (and ideally that it makes progress, 
because the pure fact that job 'runs' does not imply that), but it 
should require manual action in case the job is permanently gone. Simple 
resubmission of the job is not what I would expect to work well.


Best,

 Jan

On 7/6/23 22:07, Adlae D'Orazio wrote:


Hello,


I am using an Apache Flink cluster to run a streaming pipeline that 
I've created using Apache Beam. This streaming pipeline should be the 
only one of its type running on the Flink cluster, and I need some 
help with how to ensure that is the case.



A Dockerized pipeline runner program submits the streaming pipeline, 
and if the pipeline exits (i.e. because of an error), then the 
pipeline runner program exits and is re-run, so that the pipeline is 
submitted again and continues running.



The problem I am running into is that if the pipeline runner program 
exits, but the streaming pipeline is still running (i.e. because the 
job server went down and came back up), then I need to check in the 
pipeline runner program whether or not the pipeline is still running, 
or if it has gone down.



My first thought was to try to create a specific job name that would 
be stored in Flink's REST API, and then to see if the job was already 
running, I could query the REST API for that name. I'm having trouble 
doing this. I seem to be able to set a job name in Beam, but that job 
name does not seem to be accessible via Flink’s REST API once the 
pipeline is run using Flink. From researching this problem, I found 
this 
method, 
which initializes an AppName. This seems promising to me, but it is 
written in Java and I am looking to do it in Python.



Is there a way to specify the Flink job name via the Beam Python SDK? 
Or is there a simpler way to know that a particular Beam pipeline is 
running, and therefore not resubmit it?



Please let me know if you have any suggestions - either about how to 
execute the approaches I've described or if there's a simpler solution 
that I am overlooking. Thank you for your help!



Best,

Adlae D'Orazio



Re: Is Flink >1.14 really supported by the runners?

2023-06-13 Thread Jan Lukavský

Probably better for dev@ <mailto:d...@beam.apache.org> (added).

 Jan

On 6/13/23 12:43, Edgar H wrote:

Got you, thanks!

Are there any plans on supporting 1.17 anytime soon too?

El mar, 13 jun 2023, 12:27, Jan Lukavský  escribió:

Hi Edgar,

the website seems to be mistakenly not updated when the support for
Flink versions was added. This should be fixed [1], the runner is
stable
on versions up to 1.16.

Regarding Flink Operator, I'm not 100% familiar with it, but given
that
Beam Pipeline is translated into standard Flink application, Beam
should
be able to run on the operator just like any other application.

Best,

  Jan

[1] https://github.com/apache/beam/issues/27115

On 6/13/23 12:10, Edgar H wrote:
> Hi all,
>
> I've been running Beam with Flink in 1.14 for quite some time
now but
> just seen in Maven that runners supporting 1.15 and 1.16 are
available
> to use, however no mention to them within the compatibility matrix.
>
> Are they stable enough to be used?
>
> And also, https://issues.apache.org/jira/browse/BEAM-14538
seeing this
> issue and wondering, does Beam really need to support the operator
> usage or is it something not related to?
>
> Thanks!


Re: Is Flink >1.14 really supported by the runners?

2023-06-13 Thread Jan Lukavský

Hi Edgar,

the website seems to be mistakenly not updated when the support for 
Flink versions was added. This should be fixed [1], the runner is stable 
on versions up to 1.16.


Regarding Flink Operator, I'm not 100% familiar with it, but given that 
Beam Pipeline is translated into standard Flink application, Beam should 
be able to run on the operator just like any other application.


Best,

 Jan

[1] https://github.com/apache/beam/issues/27115

On 6/13/23 12:10, Edgar H wrote:

Hi all,

I've been running Beam with Flink in 1.14 for quite some time now but 
just seen in Maven that runners supporting 1.15 and 1.16 are available 
to use, however no mention to them within the compatibility matrix.


Are they stable enough to be used?

And also, https://issues.apache.org/jira/browse/BEAM-14538 seeing this 
issue and wondering, does Beam really need to support the operator 
usage or is it something not related to?


Thanks!


Re: [Exception] Output timestamps must be no earlier than the timestamp of the current input or timer.

2023-05-29 Thread Jan Lukavský

Hi Mario,

PubSub allows to store record metadata into "attributes". You can write 
the timestamp into an attribute of your choice and then pass the name of 
this attribute into the Read transform [1]. This will cause PubSubIO to 
assign timestamps based on the real timestamp, when the event occurred. 
Otherwise, PubSubIO uses current timestamp when the event was written to 
PubSub, which causes the issue you described.


Alternative approach is to somehow "fix" the timestamps after PubSubIO 
assigned them. This is more involved, it would (ideally) require a 
stateful DoFn that buffers input events and setups a looping timer to 
periodically flushing buffered events with the actual timestamp. The 
timer would have to use `withOutputTimestamp` [2] to specify the 
timestamp that will be used as output when the timer fires. Obviously, 
this would have to be the minimal timestamp of all buffered elements. 
Moreover, this approach might require an estimation of how much distant 
the actual timestamp of the event is from the time it is written to 
PubSub, because otherwise you would probably produce many late events. 
Though this approach is not super complicated, it somehow emulates logic 
for watermark computation and I would try to avoid it.


The last option is to disable the exception by overriding the 
getAllowedTimestampSkew() (by returning a huge Duration), but that only 
fixes the exception, not the problem that this causes in the Pipeline. 
The result would be elements that can be arbitrarily late after the 
watermark, thus might be silently dropped under some conditions.


Best,
 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.47.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-
[2] 
https://beam.apache.org/releases/javadoc/2.47.0/org/apache/beam/sdk/state/Timer.html#withOutputTimestamp-org.joda.time.Instant-


On 5/26/23 12:08, Mário Costa via user wrote:

Hi,

I need to process messages/events from google pubsub, the message is 
sent as JSON payload and contains an json attribute say "time" with 
the timestamp value of the event.


I need to group the events into 5 minute windows and write them to 
files, one file per window.


After I extract the timestamp and set it in the pipeline I get an 
exception message:


java.lang.IllegalArgumentException: Cannot output with timestamp 
2023-05-25T16:40:00.015Z. Output timestamps must be no earlier than 
the timestamp of the current input or timer (2023-05-25T16:40:00.039Z) 
minus the allowed skew (0 milliseconds) and no later than 
294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() 
Javadoc for details on changing the allowed skew.


Is there a way to solve this problem?

How can I override the timestamp of the event without having this issue ?

Follows an example of code of the pipeline:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class PubsubTimestampExample {
    public static void main(String[] args) {
        // Create the pipeline options
        PipelineOptions options = PipelineOptionsFactory.create();

        // Create the pipeline
        Pipeline pipeline = Pipeline.create(options);

        // Define the Pub/Sub topic and subscription
        String topic = "projects//topics/";

        // Read the messages from Pub/Sub with a timestamp attribute
        PCollection messages = pipeline
                .apply("ReadFromPubsub", 
PubsubIO.readStrings().fromTopic(topic)

        );

        // Process the messages and set the timestamp
        PCollection processedMessages = messages
                .apply("SetTimestamp", ParDo.of(new SetTimestampFn()));

        // Print the processed messages
        processedMessages.apply("PrintMessages", ParDo.of(new 
PrintMessagesFn()));


        // Run the pipeline
        pipeline.run();
    }

    public static class SetTimestampFn extends DoFn {
        private static final DateTimeFormatter TIMESTAMP_FORMATTER = 
DateTimeFormat.forPattern("-MM-dd'T'HH:mm:ss.SSSZ");


        @ProcessElement
        public void processElement(ProcessContext c) {
            String message = c.element();
            String[] parts = message.split(",");  // Assuming message 
format: "payload,timestamp"

            String payload = parts[0];
            String timestampString = parts[1];

            // Extract and parse the timestamp from the payload
            Instant timestamp = Instant.parse(timestampString, 
TIMESTAMP_FORMATTER);


            // Set the timestamp for the element
            c.outputWithTimestamp(payload, timestamp)

Re: [Question] - Time series - cumulative sum in right order with python api in a batch process

2023-04-25 Thread Jan Lukavský

Hi,

there is (rather old and long) discussion of this for Java SDK in [1]. 
This discussion resulted in adding @RequiresTimeSortedInput annotation 
[2]. Unfortunately this probably has not been transferred to Python SDK.


I'll sum up reasons why it was added:

 a) inputs to stateful DoFn are naturally unsorted

 b) batch Pipelines have two options how to feed data to stateful DoFn:

  b1) unsorted, feed as data arrive

  b2) explicitly sort by timestamp (or correlated field in data, e.g. 
sequential index, if provided)


In case b1) there is no way to move watermark before *all* data is read 
from input - moving watermark might produce late data with arbitrary 
lateness (that would be consistency bug).


In case b2) it would be possible to advance watermark (and thus fire 
event-time timers).


The case b2) was decided to be too restrictive to be added to the model 
as a requirement for batch pipelines - which is totally reasonable. We 
are therefore left with b1), which means that any requirement of 
use-case like yours requires first reading the complete batch data to a 
state, then manually sorting and only then processing the ordered data. 
This requires a lot of coding (that could be wrapped into a reusable 
PTransform, for sure), but it is also inefficient, because pure batch 
runner it likely to perform merge-sort grouping anyway. It only needs to 
know that it should keep this guarantee for the DoFn (and add timestamp 
to the sorting key). This is the reason why the annotation was 
introduced - to keep the Beam model as flexible as possible while 
enabling runners to make use of sorting they already do anyway, in case 
it is needed by the application logic.


I think that until an equivalent information is provided to a DoFn in 
Python SDK, the only option is buffering and manual sorting of the 
complete data set (broken per key).


 Jan

[1] https://lists.apache.org/thread/7ryqg3bm1c3bs7g1nk4krnrjxlkd7srn
[2] 
https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html


On 4/24/23 18:35, Guagliardo, Patrizio via user wrote:

Ok great, so what I did at the end was:

def cumulative_sums(key, timestamped_values):
   running = 0
   for x in sorted(timestamped_values, key=lambda x: x[1]):
 running += x[0]
 yield key, running


with beam.Pipeline() as p:
 sums = (p
 | 'Create' >> beam.Create([
 (3.1, 3),
 (4.2, 4),
 (5.4, 5),
 (2.3, 2),
 (1.5, 6)
 ])
 | 'AddTimestamps' >> beam.Map(lambda x: 
beam.transforms.window.TimestampedValue(x, x[1]))
 | beam.Map(lambda x: ('key', x))
 | 'Window' >> beam.WindowInto(FixedWindows(11))
 | beam.GroupByKey()
 | beam.FlatMapTuple(cumulative_sums)
 | 'Print' >> beam.Map(print))

However I am asking if there is a way to take a state from one window to 
another. I am asking this as I would like to do also other transformations 
where for example you take one value from one time step to the next for 
whatever reason: calculate timediff, fill in missing value (taken the value 
from time step before), etc etc...Can that be done? I have read something about 
looping timers, but could no get the details in Python. Is there a manner to do 
this?

Thanks a lot.

-Original Message-
From: Robert Bradshaw 
Sent: lunes, 24 de abril de 2023 18:00
To: user@beam.apache.org; Guagliardo, Patrizio 

Subject: Re: [Question] - Time series - cumulative sum in right order with 
python api in a batch process


CAUTION: This email originated outside the company. Do not click links or open 
attachments unless you are expecting them from the sender.



You are correct in that the data may arrive in an unordered way.
However, once a window finishes, you are guaranteed to have seen all the data 
up to that point (modulo late data) and can then confidently compute your 
ordered cumulative sum.

You could do something like this:

def cumulative_sums(key, timestamped_values):
   running = 0
   for _, x in sorted(timestamped_values):
 yield x

sums = (timestamped_data
   | beam.Map(lambda x, t=DoFn.TimestampParam: (t, x)
   | beam.WindowInto(...)
   | beam.GroupByKey()
   | beam.FlatMapTuple(cumulative_sums))



On Mon, Apr 24, 2023 at 8:23 AM Guagliardo, Patrizio via user 
 wrote:

Hi together,



I want to create a cumulative sum over a time series in a bounded batch 
processing in Apache beam with the Python API. What you can do is to write a 
cummulative sum with a stateful DoFn, but the problem you would still face is 
that you cannot handle it this way when the data in unordered, which is the 
case in a PCollection. Is there a way to make the cumulative sum over time in a 
batch process? This is what i did (whithout order):

import apache_beam as beam

from apache_beam import TimeDomain

from apache_beam.transforms.userstate import ReadModifyWriteStateSpec,
TimerSpec, CombiningValueStateSpec

fro

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-20 Thread Jan Lukavský

Hi Ning,

I might have missed that in the discussion, but we talk about batch 
execution, am I right? In streaming, all operators (PTransforms) of a 
Pipeline are run in the same slots, thus the downsides are limited. You 
can enforce streaming mode using --streaming command-line argument. But 
yes, this might have other implications. For batch only it obviously 
makes sense to limit parallelism of a (fused) 'stage', which is not an 
transform-level concept, but rather a more complex union of transforms 
divided by shuffle barrier. Would you be willing to start a follow-up 
thread in @dev mailing list for this for deeper discussion?


 Jan

On 4/20/23 19:18, Ning Kang via user wrote:

Hi Jan,

The approach works when your pipeline doesn't have too many operators. 
And the operator that needs the highest parallelism can only use at 
most #total_task_slots / #operators resources available in the cluster.


Another downside is wasted resources for other smaller operators who 
cannot make full use of task slots assigned to them. You might see 
only 1/10 tasks running while the other 9/10 tasks idle for an 
operator with parallelism 10, especially when it's doing some 
aggregation like a SUM.


One redeeming method is that, for operators following another operator 
with high fanout, we can explicitly add a Reshuffle to allow a higher 
parallelism. But this circles back to the first downside: if your 
pipeline has exponentially high fanout through it, setting a single 
parallelism for the whole pipeline is not ideal because it limits the 
scalability of your pipeline significantly.


Ning.


On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský  wrote:

Hi,

this topic was discussed many years ago and the conclusion there
was that setting the parallelism of individual operators via
FlinkPipelineOptions (or ResourceHints) is be possible, but would
be somewhat cumbersome. Although I understand that it "feels"
weird to have high parallelism for operators with small inputs,
does this actually bring any relevant performance impact? I always
use parallelism based on the largest operator in the Pipeline and
this seems to work just fine. Is there any particular need or
measurable impact of such approach?

 Jan

On 4/19/23 17:23, Nimalan Mahendran wrote:

Same need here, using Flink runner. We are processing a
pcollection (extracting features per element) then combining
these into groups of features and running the next operator on
those groups.

Each group contains ~50 elements, so the parallelism of the
operator upstream of the groupby should be higher, to be balanced
with the downstream operator.

On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote:

Hi Reuven,

It would be better to set parallelism for operators, as I
mentioned before, there may be multiple groupby, join
operators in one pipeline, and their parallelism can be
different due to different input data sizes.

On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax 
wrote:

Jeff - does setting the global default work for you, or
do you need per-operator control? Seems like it would be
to add this to ResourceHints.

On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
 wrote:

Yeah, I don't think we have a good per-operator API
for this. If we were to add it, it probably belongs
in ResourceHints.

On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
 wrote:

Looking at FlinkPipelineOptions, there is a
parallelism option you can set. I believe this
sets the default parallelism for all Flink operators.

On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang
 wrote:

Thanks Holden, this would work for Spark, but
Flink doesn't have such kind of mechanism, so
I am looking for a general solution on the
beam side.

On Mon, Apr 17, 2023 at 10:08 AM Holden Karau
 wrote:

To a (small) degree Sparks “new” AQE
might be able to help depending on what
kind of operations Beam is compiling it
down to.

Have you tried setting
spark.sql.adaptive.enabled &
spark.sql.adaptive.coalescePartitions.enabled



On Mon, Apr 17, 2023 at 10:34 AM Reuven
Lax via user  wrote:

I see. Robert - what is the story for
parallelism controls on GBK with the
  

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-20 Thread Jan Lukavský

Hi,

this topic was discussed many years ago and the conclusion there was 
that setting the parallelism of individual operators via 
FlinkPipelineOptions (or ResourceHints) is be possible, but would be 
somewhat cumbersome. Although I understand that it "feels" weird to have 
high parallelism for operators with small inputs, does this actually 
bring any relevant performance impact? I always use parallelism based on 
the largest operator in the Pipeline and this seems to work just fine. 
Is there any particular need or measurable impact of such approach?


 Jan

On 4/19/23 17:23, Nimalan Mahendran wrote:
Same need here, using Flink runner. We are processing a pcollection 
(extracting features per element) then combining these into groups of 
features and running the next operator on those groups.


Each group contains ~50 elements, so the parallelism of the operator 
upstream of the groupby should be higher, to be balanced with the 
downstream operator.


On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote:

Hi Reuven,

It would be better to set parallelism for operators, as I
mentioned before, there may be multiple groupby, join operators in
one pipeline, and their parallelism can be different due to
different input data sizes.

On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax  wrote:

Jeff - does setting the global default work for you, or do you
need per-operator control? Seems like it would be to add this
to ResourceHints.

On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
 wrote:

Yeah, I don't think we have a good per-operator API for
this. If we were to add it, it probably belongs in
ResourceHints.

On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
 wrote:

Looking at FlinkPipelineOptions, there is a
parallelism option you can set. I believe this sets
the default parallelism for all Flink operators.

On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang
 wrote:

Thanks Holden, this would work for Spark, but
Flink doesn't have such kind of mechanism, so I am
looking for a general solution on the beam side.

On Mon, Apr 17, 2023 at 10:08 AM Holden Karau
 wrote:

To a (small) degree Sparks “new” AQE might be
able to help depending on what kind of
operations Beam is compiling it down to.

Have you tried setting
spark.sql.adaptive.enabled &
spark.sql.adaptive.coalescePartitions.enabled



On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax
via user  wrote:

I see. Robert - what is the story for
parallelism controls on GBK with the Spark
or Flink runners?

On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang
 wrote:

No, I don't use dataflow, I use Spark
& Flink.


On Mon, Apr 17, 2023 at 8:08 AM Reuven
Lax  wrote:

Are you running on the Dataflow
runner? If so, Dataflow - unlike
Spark and Flink - dynamically
modifies the parallelism as the
operator runs, so there is no need
to have such controls. In fact
these specific controls wouldn't
make much sense for the way
Dataflow implements these operators.

On Sun, Apr 16, 2023 at 12:25 AM
Jeff Zhang  wrote:

Just for
performance tuning like in
Spark and Flink.


On Sun, Apr 16, 2023 at
1:10 PM Robert Bradshaw via
user  wrote:

What are you trying to
achieve by setting the
parallelism?

On Sat, Apr 15, 2023 at
5:13 PM Jeff Zhang
 wrote:

Thanks Reuven, what I
mean is to set the

Re: Join streams with different frequencies

2023-01-04 Thread Jan Lukavský

Hi,

the general pattern here would be to map both the PCollections to a 
common type, e.g. PCollection> and then flatten them 
into one PCollection, onto which you apply a stateful DoFn, see [1]. You 
would hold the DataY value of your ID in the state and match it against 
events coming from DataX stream. Under the assumption you do not need to 
make ensure that each DataX stream is matched on the *exactly preceding* 
DataY event (in event time), this works fine.


If you need to be sure that each DataX event is matched against the 
latest DataY (and most of the time it is likely you don't have this 
requirement), then you can:


 a) buffer DataX in a BagState and use timers to flush the state after 
some timeout, or


 b) use @DoFn.RequiresTimeSortedInput [2] (if your runners supports 
it), which will do the buffering for you and pass the elements into 
@ProcessElement method sorted by event timestamp


In both cases it is worth to realize how you want to handle late data 
(i.e. data that arrived after watermark, or after an element was already 
matched, but on a wrong element). The solution (b) simply drops the late 
element (which might not be what you want), or introduces latency 
defined by allowedLateness. Another option would be to implement 
retractions and process them downstream. I implemented something like 
that in [3].


Hope that helps,

 Jan

[1] https://beam.apache.org/blog/stateful-processing/

[2] 
https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html


[3] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter4/src/main/java/com/packtpub/beam/chapter4/StreamingInnerJoin.java


On 1/4/23 16:28, Ifat Afek (Nokia) wrote:


Thanks Sören,

I already saw your stack overflow question while trying to find a 
solution 😊


I prefer a solution that does not involve an external cache like 
Redis, if possible.


Best Regards,

Ifat

*From: *Sören Henning 
*Reply-To: *"user@beam.apache.org" 
*Date: *Tuesday, 3 January 2023 at 14:56
*To: *"user@beam.apache.org" 
*Subject: *Re: Join streams with different frequencies

Hi,

while I cannot provide you with a definite answer to your question, 
maybe my Stack Overflow question is interesting for you: 
https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i


Best regards,
Sören

Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):

Hi,

We are trying to implement the following use case:

We have a stream of DataX events that arrive every 5 minutes and
require some processing. Each event holds data for a specific
non-unique ID (we keep getting updated data for each ID). There
might be up to 1,000,000 IDs.

In addition, there is a stream of DataY events for some of these
IDs, that arrive in a variable frequency. Could be after a minute
and then again after 5 hours.

We would like to join the current DataX and latest DataY events by
ID (and process only IDs that have both DataX and DataY events).

We thought of holding a state of DataY events per ID in a global
window, and then use it as a side input for filtering the DataX
events stream. The state should hold the latest (by timestamp)
DataY event that arrived.

The problem is: if we are using discardingFiredPanes(), then each
DataY event is fired only once and cannot be reused later on for
filtering. On the other hand, if we are using
accumulatingFiredPanes(), then a list of all DataY events that
arrived is fired.

Are we missing something? what is the best practice for combining
two streams, one with a variable frequency?

Thanks,

Ifat


Re: UNIMPLEMENTED method: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker

2022-09-21 Thread Jan Lukavský
dingServerCallListener.java:40)
        at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
        at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
        at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
        at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)
```

Does anyone know which things I would also need to configure? Thanks


Sincerely,
Lydian Lee



On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský  wrote:

Hi Lydian,

there are two parts involved.

 a) expansion service (which you run on port 8097) - this service
expands the ReadFromKafka which is Java transform

 b) Java SDK environment, which is not the expansion service, it
must be some environment that is able to run the Java
ReadFromKafka transform. In flink, you can use PROCESS environment
type (e.g. [1]), but there might be other options (e.g. DOCKER),
see [2]

Hope this helps,

 Jan

[1]

https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py

[2] https://beam.apache.org/documentation/runtime/sdk-harness-config/

On 9/20/22 10:45, Lydian wrote:

Hi,
I am using protable runner (flink) with python SDK.  I am on
latest version of Beam (0.41).
The job is running on kubernetes. I launched the job manager with
sidecar container (using
image: apache/beam_flink1.14_job_server:2.41.0) to start the
expansion service with following command:
```
java
-cp /opt/apache/beam/jars/
org.apache.beam.sdk.expansion.service.ExpansionService
8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=EXTERNAL
--defaultEnvironmentConfig=localhost:8097
```
In the code I am doing:
```
ReadFromKafka(
consumer_config={
"bootstrap.servers": 'BROKER',
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.jaas.config":
f'org.apache.kafka.common.security.scram.ScramLoginModule
required username="{sasl_username}" password="{sasl_password}";',
},
topics=[self.options.topic],
with_metadata=False,
expansion_service="localhost:8097"
)
```
But it shows with error:
```
2022-09-20 08:36:36,549 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
Source: Impulse -> [3]Reading message from

kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
KafkaIO.ReadSourceDescriptors} (1/1)
(da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to
FAILED on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal
(dataPort=43553).
org.apache.flink.util.SerializedThrowable:
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
UNIMPLEMENTED: Method not found:
org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker
at

org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
~[?:?]
at

org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
~[?:?]
at

org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
~[?:?]
at

org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
~[?:?]
at

org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
~[?:?]
at

org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:451)
~[?:?]
at

org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:436)
~[?:?]
at

org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
~[?:?]
at

org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
~[?:?]
at

org.apache.beam.runners.fnexecution.cont

Re: UNIMPLEMENTED method: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker

2022-09-20 Thread Jan Lukavský

Hi Lydian,

there are two parts involved.

 a) expansion service (which you run on port 8097) - this service 
expands the ReadFromKafka which is Java transform


 b) Java SDK environment, which is not the expansion service, it must 
be some environment that is able to run the Java ReadFromKafka 
transform. In flink, you can use PROCESS environment type (e.g. [1]), 
but there might be other options (e.g. DOCKER), see [2]


Hope this helps,

 Jan

[1] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py


[2] https://beam.apache.org/documentation/runtime/sdk-harness-config/

On 9/20/22 10:45, Lydian wrote:

Hi,
I am using protable runner (flink) with python SDK.  I am on latest 
version of Beam (0.41).
The job is running on kubernetes. I launched the job manager with 
sidecar container (using 
image: apache/beam_flink1.14_job_server:2.41.0) to start the expansion 
service with following command:

```
java
-cp /opt/apache/beam/jars/
org.apache.beam.sdk.expansion.service.ExpansionService
8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=EXTERNAL
--defaultEnvironmentConfig=localhost:8097
```
In the code I am doing:
```
ReadFromKafka(
consumer_config={
"bootstrap.servers": 'BROKER',
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.jaas.config": 
f'org.apache.kafka.common.security.scram.ScramLoginModule required 
username="{sasl_username}" password="{sasl_password}";',

},
topics=[self.options.topic],
with_metadata=False,
expansion_service="localhost:8097"
)
```
But it shows with error:
```
2022-09-20 08:36:36,549 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: 
Impulse -> [3]Reading message from 
kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), 
KafkaIO.ReadSourceDescriptors} (1/1) 
(da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to 
FAILED on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal 
(dataPort=43553).
org.apache.flink.util.SerializedThrowable: 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: 
UNIMPLEMENTED: Method not found: 
org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) 
~[?:?]
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) 
~[?:?]
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) 
~[?:?]
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) 
~[?:?]
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) 
~[?:?]
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:451) 
~[?:?]
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:436) 
~[?:?]
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) 
~[?:?]
at 
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) 
~[?:?]
at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202) 
~[?:?]
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249) 
~[?:?]
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) 
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) 
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) 
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) 
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) 
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) 
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist_2.12-1.14.5.jar:1.14.5]

at java.lang.Thread.run(Thread.java:748) ~[

Re: sink triggers

2022-07-25 Thread Jan Lukavský

Hi Sigalit,

there might be several options, which one is the best would depend on 
the actual use-case. You might:


 a) use the CoGroupByKey transform with a fixed window [1], or

 b) use stateful processing [2] with timer triggering your output

Which one is the best depends on if you can use windowing semantics 
provided by Beam [3]. Windowing is needed for the CoGBK approach, the 
stateful approach works with globally windowed PCollections.


Hope this help, feel free to ask more questions you might have.

Best,

 Jan

[1] 
https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/


[2] https://beam.apache.org/blog/stateful-processing/

[3] https://beam.apache.org/documentation/programming-guide/#windowing

On 7/25/22 14:09, Sigalit Eliazov wrote:

Hi all,

I have a pipeline that reads input from a few sources, combines them 
and creates a view of the data.

I need to send an output to kafka every X minutes.
What will be the best way to implement this?
Thanks
Sigalit



Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-06-07 Thread Jan Lukavský

Hi Gorjan,

sorry for the delay. What is the input of the Pipeline? Does the job 
stop reading the source or stop processing them? Can you verify if the 
TF code is not only busy doing computation?


 Jan

On 6/3/22 11:05, Gorjan Todorovski wrote:

Hi Jan,

This is a batch job so no windows. It is basically a job launched by a 
TFX component, so I don't have control over Beam code being executed.
I conclude that the job is stuck, since the number of bytes and 
processed rows do not move for a long time on a specific task and 
subtask (always the same one).


Thanks,
Gorjan


On Thu, Jun 2, 2022 at 4:45 PM Jan Lukavský  wrote:

-user@flink <http://u...@flink.apache.org> as this looks like
purely beam issue

Could you please elaborate more about what "stuck" means? Does the
watermark stop progressing? Does that happen at any specific
instant (e.g. end of window or end of window + allowed lateness)?

On 6/1/22 15:43, Gorjan Todorovski wrote:

Hi Jan,

I have not checked the harness log. I have now checked it *Apache
Beam worker log) and found this, but currently not sure what it
means:

2022/06/01 13:34:40 Python exited: 
2022/06/01 13:34:41 Python exited: 
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 926, in
_bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File

"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
line 587, in 
    target=lambda: self._read_inputs(elements_iterator),
  File

"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
line 570, in _read_inputs
    for elements in elements_iterator:
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py",
line 416, in __next__
    return self._next()
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py",
line 803, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous
of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string =
"{"created":"@1654090485.252525992","description":"Error received
from peer ipv4:127.0.0.1:44439

<http://127.0.0.1:44439>","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer
hanging up","grpc_status":1}"
>

2022/06/01 13:34:45 Python exited: 
2022/06/01 13:34:46 Python exited: 
2022/06/01 13:34:46 Python exited: 
2022/06/01 13:34:47 Python exited: 
Starting worker with command ['/opt/apache/beam/boot',
'--id=3-1', '--logging_endpoint=localhost:44267',
'--artifact_endpoint=localhost:36413',
'--provision_endpoint=localhost:42179',
'--control_endpoint=localhost:38825']
Starting worker with command ['/opt/apache/beam/boot',
'--id=3-3', '--logging_endpoint=localhost:38683',
'--artifact_endpoint=localhost:44867',
'--provision_endpoint=localhost:34833',
'--control_endpoint=localhost:44351']
Starting worker with command ['/opt/apache/beam/boot',
'--id=3-2', '--logging_endpoint=localhost:35391',
'--artifact_endpoint=localhost:46571',
'--provision_endpoint=localhost:44073',
'--control_endpoint=localhost:44133']
Starting work...

On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský  wrote:

Hi Gorjan,

+user@beam <mailto:user@beam.apache.org>

The trace you posted is just waiting for a bundle to finish
in the SDK harness. I would suspect there is a problem in the
logs of the harness. Did you look for possible errors there?

 Jan

On 5/31/22 13:54, Gorjan Todorovski wrote:

Hi,

I am running a TensorFlow Extended (TFX) pipeline which uses
Apache Beam for data processing which in turn has a Flink
Runner (Basically a batch job on a Flink Session Cluster on
Kubernetes) version 1.13.6, but the job (for gathering
stats) gets stuck.

There is nothing significant in the Job Manager or Task
Manager logs. The only thing that possibly might tell why
the task is stuck seems to be a thread dump:

"MapPartition (MapPartition at [14]{TFXIORead[train],
GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
java.util.concurrent.CompletableFuture$Signaller@6f078632

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-06-02 Thread Jan Lukavský

-user@flink  as this looks like purely beam issue

Could you please elaborate more about what "stuck" means? Does the 
watermark stop progressing? Does that happen at any specific instant 
(e.g. end of window or end of window + allowed lateness)?


On 6/1/22 15:43, Gorjan Todorovski wrote:

Hi Jan,

I have not checked the harness log. I have now checked it *Apache Beam 
worker log) and found this, but currently not sure what it means:


2022/06/01 13:34:40 Python exited: 
2022/06/01 13:34:41 Python exited: 
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 926, in 
_bootstrap_inner

    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", 
line 587, in 

    target=lambda: self._read_inputs(elements_iterator),
  File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", 
line 570, in _read_inputs

    for elements in elements_iterator:
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 
416, in __next__

    return self._next()
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 
803, in _next

    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of 
RPC that terminated with:

status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string = 
"{"created":"@1654090485.252525992","description":"Error received from 
peer ipv4:127.0.0.1:44439 
<http://127.0.0.1:44439>","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer 
hanging up","grpc_status":1}"

>

2022/06/01 13:34:45 Python exited: 
2022/06/01 13:34:46 Python exited: 
2022/06/01 13:34:46 Python exited: 
2022/06/01 13:34:47 Python exited: 
Starting worker with command ['/opt/apache/beam/boot', '--id=3-1', 
'--logging_endpoint=localhost:44267', 
'--artifact_endpoint=localhost:36413', 
'--provision_endpoint=localhost:42179', 
'--control_endpoint=localhost:38825']
Starting worker with command ['/opt/apache/beam/boot', '--id=3-3', 
'--logging_endpoint=localhost:38683', 
'--artifact_endpoint=localhost:44867', 
'--provision_endpoint=localhost:34833', 
'--control_endpoint=localhost:44351']
Starting worker with command ['/opt/apache/beam/boot', '--id=3-2', 
'--logging_endpoint=localhost:35391', 
'--artifact_endpoint=localhost:46571', 
'--provision_endpoint=localhost:44073', 
'--control_endpoint=localhost:44133']

Starting work...

On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský  wrote:

Hi Gorjan,

+user@beam <mailto:user@beam.apache.org>

The trace you posted is just waiting for a bundle to finish in the
SDK harness. I would suspect there is a problem in the logs of the
harness. Did you look for possible errors there?

 Jan

On 5/31/22 13:54, Gorjan Todorovski wrote:

Hi,

I am running a TensorFlow Extended (TFX) pipeline which uses
Apache Beam for data processing which in turn has a Flink Runner
(Basically a batch job on a Flink Session Cluster on Kubernetes)
version 1.13.6, but the job (for gathering stats) gets stuck.

There is nothing significant in the Job Manager or Task Manager
logs. The only thing that possibly might tell why the task is
stuck seems to be a thread dump:

"MapPartition (MapPartition at [14]{TFXIORead[train],
GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at sun.misc.Unsafe.park(Native Method)
- waiting on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at

java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at

java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at

org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
...
I use 32 parallel degrees. Task managers are set, so each TM runs
in one container with 1 CPU and a total process memory set to 20
GB. Each TM runs 1 tasksslot.
T

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-06-01 Thread Jan Lukavský

Hi Gorjan,

+user@beam 

The trace you posted is just waiting for a bundle to finish in the SDK 
harness. I would suspect there is a problem in the logs of the harness. 
Did you look for possible errors there?


 Jan

On 5/31/22 13:54, Gorjan Todorovski wrote:

Hi,

I am running a TensorFlow Extended (TFX) pipeline which uses Apache 
Beam for data processing which in turn has a Flink Runner (Basically a 
batch job on a Flink Session Cluster on Kubernetes) version 1.13.6, 
but the job (for gathering stats) gets stuck.


There is nothing significant in the Job Manager or Task Manager logs. 
The only thing that possibly might tell why the task is stuck seems to 
be a thread dump:


"MapPartition (MapPartition at [14]{TFXIORead[train], 
GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on 
java.util.concurrent.CompletableFuture$Signaller@6f078632

at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.CompletableFuture$Signaller@6f078632
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)

...
I use 32 parallel degrees. Task managers are set, so each TM runs in 
one container with 1 CPU and a total process memory set to 20 GB. Each 
TM runs 1 tasksslot.
This is failing with ~100 files with a total size of about 100 GB. If 
I run the pipeline with a smaller number of files to process, it runs ok.
I need Flink to be able to process different amounts of data as it is 
able to scale by automatically adding pods depending on the parallel 
degree setting for the specific job (I set the parallel degree to the 
max(number of files,32))

Thanks,
Gorjan

Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Jan Lukavský
Yes, I suppose it might be more complex than the code snippet, that was 
just to demonstrate the idea. Also the "exportRDD" would probably return 
WindowedValue instead of plain T.


On 5/24/22 17:23, Reuven Lax wrote:
Something like this seems reasonable. Beam PCollections also have a 
timestamp associated with every element, so the importRDD function 
probably needs a way to specify the timestamp (could be an attribute 
name for dataframes or a timestamp extraction function for regular RDDs).


On Tue, May 24, 2022 at 2:40 AM Jan Lukavský  wrote:

Hi,
I think this feature is valid. Every runner for which Beam is not
a 'native' SDK uses some form of translation context, which maps
PCollection to internal representation of the particular SDK of
the runner (RDD in this case). It should be possible to "import"
an RDD into the specific runner via something like

  SparkRunner runner = ;
  PCollection<...> pCollection = runner.importRDD(rdd);

and similarly

  RDD<...> rdd = runner.exportRDD(pCollection);

Yes, apparently this would be runner specific, but that is the
point, actually. This would enable using features and libraries,
that Beam does not have, or micro-optimize some particular step
using runner-specific features, that we don't have in Beam. We
actually had this feature (at least in a prototype) many years ago
when Euphoria was a separate project.

 Jan

On 5/23/22 20:58, Alexey Romanenko wrote:




On 23 May 2022, at 20:40, Brian Hulette  wrote:

Yeah I'm not sure of any simple way to do this. I wonder if it's
worth considering building some Spark runner-specific feature
around this, or at least packaging up Robert's proposed solution?


I’m not sure that a runner specific feature is a good way to do
this since the other runners won’t be able to support it or I’m
missing something?


There could be other interesting integrations in this space too,
e.g. using Spark RDDs as a cache for Interactive Beam.


Another option could be to add something like SparkIO (or
FlinkIO/whatever) to read/write data from/to Spark data
structures for such cases (Spark schema to Beam schema convention
also could be supported). And dreaming a bit more, for those who
need to have a mixed pipeline (e.g. Spark + Beam) such connectors
could support the push-downs of pure Spark pipelines and then use
the result downstream in Beam.

—
Alexey




Brian

On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw
 wrote:

The easiest way to do this would be to write the RDD
somewhere then
read it from Beam.

On Mon, May 23, 2022 at 9:39 AM Yushu Yao
 wrote:
>
> Hi Folks,
>
> I know this is not the optimal way to use beam :-) But
assume I only use the spark runner.
>
> I have a spark library (very complex) that emits a spark
dataframe (or RDD).
> I also have an existing complex beam pipeline that can do
post processing on the data inside the dataframe.
>
> However, the beam part needs a pcollection to start with.
The question is, how can I convert a spark RDD into a
pcollection?
>
> Thanks
> -Yushu
>



Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Jan Lukavský

+dev@beam <mailto:d...@beam.apache.org>

On 5/24/22 11:40, Jan Lukavský wrote:


Hi,
I think this feature is valid. Every runner for which Beam is not a 
'native' SDK uses some form of translation context, which maps 
PCollection to internal representation of the particular SDK of the 
runner (RDD in this case). It should be possible to "import" an RDD 
into the specific runner via something like


  SparkRunner runner = ;
  PCollection<...> pCollection = runner.importRDD(rdd);

and similarly

  RDD<...> rdd = runner.exportRDD(pCollection);

Yes, apparently this would be runner specific, but that is the point, 
actually. This would enable using features and libraries, that Beam 
does not have, or micro-optimize some particular step using 
runner-specific features, that we don't have in Beam. We actually had 
this feature (at least in a prototype) many years ago when Euphoria 
was a separate project.


 Jan

On 5/23/22 20:58, Alexey Romanenko wrote:




On 23 May 2022, at 20:40, Brian Hulette  wrote:

Yeah I'm not sure of any simple way to do this. I wonder if it's 
worth considering building some Spark runner-specific feature around 
this, or at least packaging up Robert's proposed solution?


I’m not sure that a runner specific feature is a good way to do this 
since the other runners won’t be able to support it or I’m missing 
something?


There could be other interesting integrations in this space too, 
e.g. using Spark RDDs as a cache for Interactive Beam.


Another option could be to add something like SparkIO (or 
FlinkIO/whatever) to read/write data from/to Spark data structures 
for such cases (Spark schema to Beam schema convention also could be 
supported). And dreaming a bit more, for those who need to have a 
mixed pipeline (e.g. Spark + Beam) such connectors could support the 
push-downs of pure Spark pipelines and then use the result downstream 
in Beam.


—
Alexey




Brian

On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw 
 wrote:


The easiest way to do this would be to write the RDD somewhere then
read it from Beam.

On Mon, May 23, 2022 at 9:39 AM Yushu Yao 
wrote:
>
> Hi Folks,
>
> I know this is not the optimal way to use beam :-) But assume
I only use the spark runner.
>
> I have a spark library (very complex) that emits a spark
dataframe (or RDD).
> I also have an existing complex beam pipeline that can do post
processing on the data inside the dataframe.
>
> However, the beam part needs a pcollection to start with. The
question is, how can I convert a spark RDD into a pcollection?
>
> Thanks
> -Yushu
>



Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Jan Lukavský

Hi,
I think this feature is valid. Every runner for which Beam is not a 
'native' SDK uses some form of translation context, which maps 
PCollection to internal representation of the particular SDK of the 
runner (RDD in this case). It should be possible to "import" an RDD into 
the specific runner via something like


  SparkRunner runner = ;
  PCollection<...> pCollection = runner.importRDD(rdd);

and similarly

  RDD<...> rdd = runner.exportRDD(pCollection);

Yes, apparently this would be runner specific, but that is the point, 
actually. This would enable using features and libraries, that Beam does 
not have, or micro-optimize some particular step using runner-specific 
features, that we don't have in Beam. We actually had this feature (at 
least in a prototype) many years ago when Euphoria was a separate project.


 Jan

On 5/23/22 20:58, Alexey Romanenko wrote:




On 23 May 2022, at 20:40, Brian Hulette  wrote:

Yeah I'm not sure of any simple way to do this. I wonder if it's 
worth considering building some Spark runner-specific feature around 
this, or at least packaging up Robert's proposed solution?


I’m not sure that a runner specific feature is a good way to do this 
since the other runners won’t be able to support it or I’m missing 
something?


There could be other interesting integrations in this space too, e.g. 
using Spark RDDs as a cache for Interactive Beam.


Another option could be to add something like SparkIO (or 
FlinkIO/whatever) to read/write data from/to Spark data structures for 
such cases (Spark schema to Beam schema convention also could be 
supported). And dreaming a bit more, for those who need to have a 
mixed pipeline (e.g. Spark + Beam) such connectors could support the 
push-downs of pure Spark pipelines and then use the result downstream 
in Beam.


—
Alexey




Brian

On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw 
 wrote:


The easiest way to do this would be to write the RDD somewhere then
read it from Beam.

On Mon, May 23, 2022 at 9:39 AM Yushu Yao 
wrote:
>
> Hi Folks,
>
> I know this is not the optimal way to use beam :-) But assume I
only use the spark runner.
>
> I have a spark library (very complex) that emits a spark
dataframe (or RDD).
> I also have an existing complex beam pipeline that can do post
processing on the data inside the dataframe.
>
> However, the beam part needs a pcollection to start with. The
question is, how can I convert a spark RDD into a pcollection?
>
> Thanks
> -Yushu
>



Re: Beam slowness compared to flink-native

2022-05-12 Thread Jan Lukavský

Hi Ifat,

can you try adding 'use_deprecated_read' experiment to the 
PipelineOptions? IIRC the default expansion for KafkaIO uses splittable 
DoFn now, which could be the cause for the performance difference you 
see.  You can add the option on command line using 
"--experiments=use_deprecated_read".


 Jan

On 5/11/22 16:20, Afek, Ifat (Nokia - IL/Kfar Sava) wrote:


Hi,

Thanks for the tip. I enabled the fasterCopy parameter and the 
performance improved from 30,000 strings per second to 35,000-40,000. 
There is still a huge difference compared to native flink (150,000 
strings per second), which I don’t understand.


I opened a bug for that: https://issues.apache.org/jira/browse/BEAM-14438

Thanks,

Ifat

*From: *Talat Uyarer 
*Reply-To: *"user@beam.apache.org" 
*Date: *Wednesday, 11 May 2022 at 3:04
*To: *"user@beam.apache.org" 
*Subject: *Re: Beam slowness compared to flink-native

HI Ifat,

Did you enable fasterCopy parameter ?

Please look at this issue: 
https://issues.apache.org/jira/browse/BEAM-11146


Thanks

On Mon, May 2, 2022 at 12:57 AM Afek, Ifat (Nokia - IL/Kfar Sava) 
 wrote:


Hi,

I’m investigating a slowness of our beam pipelines, and as part of
that I tried to compare a very simple beam pipeline with an
equivalent flink-native pipeline. Both pipelines should read
strings from one kafka topic and write them to another topic. I’m
using beam 2.38.0 and flink 1.13.

I tried running each pipeline separately, on a single task manager
with a single slot and parallelism 1. What I saw is that Flink
native runs 5 times faster than beam (150,000 strings per second
in flink comparing to 30,000 in beam).

I’ll be happy if you can help me figure out why there is such a
difference. Maybe the pipelines are not really equivalent, or the
beam configuration is wrong?

Flink native pipeline:

public void process() throws Exception {

StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", kafkaAddress);

properties.setProperty("group.id ", KAFKA_GROUP_ID);

FlinkKafkaConsumer consumer = new
FlinkKafkaConsumer<>(INPUT_TOPIC, new SimpleStringSchema(),
properties);

  consumer.setStartFromEarliest();


consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

FlinkKafkaProducer producer = new
FlinkKafkaProducer<>(kafkaAddress, OUTPUT_TOPIC, new
SimpleStringSchema());

DataStream inputMessagesStream =
environment.addSource(consumer);

inputMessagesStream.addSink(producer);

environment.execute();

}

Beam pipeline:

public static void main(String[] args) {

try {

StreamingOptions options =
PipelineOptionsFactory.fromArgs(args).as(StreamingOptions.class);

options.setStreaming(true);

options.setRunner(FlinkRunner.class);

Pipeline pipeline = Pipeline.create(options);

  PTransform>>
transform = KafkaIO.read()

.withBootstrapServers(bootstrapServers)

.withTopic(inputTopic)

.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializer(StringDeserializer.class)

.withConsumerConfigUpdates((ImmutableMap.of(

"auto.offset.reset", "earliest",

"group.id ", consumerGroup)))

.withoutMetadata();

PCollection> input =
pipeline.apply("readFromKafka", transform);

PCollection> convertedInput =

input.apply("ConvertToStringRecord",

ParDo.of(new ConvertToStringRecord(outputTopic) {}))

.setCoder(new ProducerRecordCoder<>(StringUtf8Coder.of(),
StringUtf8Coder.of()));

KafkaIO.WriteRecords writeToAvro =
KafkaIO.writeRecords()

.withBootstrapServers(bootstrapServers)

.withTopic(outputTopic)

.withKeySerializer(StringSerializer.class)

.withValueSerializer(StringSerializer.class);

convertedInput.apply("writeToKafka", writeToAvro);

pipeline.run();

} catch (Exception e) {

log.atError().withThrowable(e).log("Exception thrown while running
pipeline PipelineStringToString");

}

}

@Log4j2

@AllArgsConstructor

public class ConvertToStringRecord extends DoFn, ProducerRecord> {

private String topic;

private static ProducerRecord getRecord(KV message, String topic) {

String string = message.getValue();

ProducerRecord pr = new ProducerRecord<>(topic,
message.getKey(), string) {};

  pr.headers().add("__TypeId__",
String.class.getName().getBytes(StandardCharsets.UTF_8));

return pr;

}

@ProcessElement

public void processElement(ProcessContext c) {

try {

   ProducerRecord pr =
getRecord(Objects.requireNonNull(c.element()), topic);

c.output(pr);

} catch (Exception e) {

log.atError().withThrowable(e).log("exception thrown while
processing string");

Re: session window question

2022-04-27 Thread Jan Lukavský

Hi Sigalit,

if I understand correctly, what you want is a deduplication of outputs 
coming from your GBK logic, is that correct? If do, you can use a 
stateful DoFn and a ValueState holding the last value seen per key in 
global window. There is an implementation of this approach in 
Deduplicate.KeyedValues [1].


 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.38.0/org/apache/beam/sdk/transforms/Deduplicate.KeyedValues.html


On 4/27/22 13:36, Sigalit Eliazov wrote:

Hi all
i have the following scenario:
a. a pipeline that reads messages from kafka and a session window with 
1 minute duration.

b.  groupbykey in order to aggregate the data
c. for each 'group' i do some calculation and build a new event to 
send to kafka.


the output of this cycle is
key1 - value1
key2 - value2

If a new message arrives with the same key i would like to have a 
logic that checks
if the current message is : key1-value1 don't send (because it was 
already sent).

Currently we implemented this using DB (postgres).
the performance of this implementation is not very good.
Is there any way to implement this without any external state?

thanks a lot
Sigalit


Re: Beam on Flink not processing input splits in parallel

2022-03-09 Thread Jan Lukavský
There are two "kinds" of splits in SDF - one splits the restriction 
*before* being processed and the other *during* processing. The first 
one is supported (it is needed for correctness) and the other is in 
bounded case only an optimization (which is not currently supported). It 
seems to me, that is should be possible to pre-split on filenames, which 
then it should be processed in parallel. Unfortunately I'm not that 
familiar with python SDK's fileio, so I'd rather leave the more detailed 
answer for someone else. But otherwise what you say makes sense to me.


 Jan

On 3/9/22 17:26, Janek Bevendorff wrote:

Thanks for the response! That's what I feared was going on.

I consider this a huge shortcoming, particularly because it does not 
only affect users with large files like you said. The same happens 
with many small files, because file globs are also fused to one 
worker. The only way to process files in parallel is to write a 
PTransform that does MatchFiles(file_glob) | Reshuffle() | 
ReadAllFromText(). A simple ReadFromText(file_glob) would not be run 
in parallel.


In fact, if you feed multiple textfile sources to your job, not only 
will each of these inputs process its files on one worker, but even 
the inputs are fused together. So even if you resolved the glob 
locally and then added one input for each individual file, all of that 
would still run sequentially.



On 09/03/2022 17:14, Jan Lukavský wrote:

Hi Janek,

I think you hit a deficiency in the FlinkRunner's SDF implementation. 
AFAIK the runner is unable to do dynamic splitting, which is what you 
are probably looking for. What you describe essentially works in the 
model, but FlinkRunner does not implement the complete contract to 
make use of ability to split a large file to multiple parts and 
process them in parallel. I'm afraid there is no simple solution 
currently, other than what you described. The dynamic splitting might 
be introduced in some future release so that it starts working as you 
expect out of the box. This should affect mostly users with few large 
files, if you can parallelize on the files itself, then it should 
work fine (which is what you observe).


 Jan

On 3/9/22 16:44, Janek Bevendorff wrote:
I went through all Flink and Beam documentation I could find to see 
if I overlooked something, but I could not get the text input source 
to unfuse the file input splits. This creates a huge input 
bottleneck, because one worker is busy reading records from a huge 
input file while 99 others wait for input and I can only shuffle the 
generated records, not the actual file input splits.


To fix it, I wrote a custom PTransform that globs files, optionally 
shuffles the file names, generates fixed-size split markers, 
shuffles the markers, and then reads the lines from these splits. 
This works well, but it feels like a hack, because I think Beam 
should be doing that out of the box. My implementation of the file 
reader is also much simpler and relies on IOBase.readline(), which 
keeps the code short and concise, but also loses a lot of 
flexibility compared to the Beam file reader implementation (such as 
support for custom line delimiters).


Any other ideas how I can solve this without writing custom 
PTransforms?


Janek


On 08/03/2022 14:11, Janek Bevendorff wrote:

Hey there,

According to the docs, when using a FileBasedSource or a splittable 
DoFn, the runner is free to initiate splits that can be run in 
parallel. As far as I can tell, the former is actually happening on 
my Apache Flink cluster, but the latter is not. This causes a 
single Taskmanager to process all splits of an input text file. Is 
this known behaviour and how can I fix this?


I have a pipeline that looks like this (Python SDK):

(pipeline
| 'Read Input File' >> textio.ReadFromText(input_glob, 
min_bundle_size=1)

| 'Reshuffle Lines' >> beam.Reshuffle()
| 'Map Records' >> beam.ParDo(map_func))

The input file is a large, uncompressed plaintext file from a 
shared drive containing millions of newline-separated data records. 
I am running this job with a parallelism of 100, but it is 
bottlenecked by a single worker running ReadFromText(). The 
reshuffling in between was added to force Beam/Flink to parallelize 
the processing, but this has no effect on the preceding stage. Only 
the following map operation is being run in parallel. The stage 
itself is marked as having a parallelism of 100, but 99 workers 
finish immediately.


I had the same issue earlier with another input source, in which I 
match a bunch of WARC file globs and then iterate over them in a 
splittable DoFn. I solved the missing parallelism by adding an 
explicit reshuffle in between matching input globs and actually 
reading the individual files:


class WarcInput(beam.PTransform):
    def expand(self, pcoll):
    return pcoll | MatchFiles(self._file_pattern) | 
beam.Resh

Re: Beam on Flink not processing input splits in parallel

2022-03-09 Thread Jan Lukavský

Hi Janek,

I think you hit a deficiency in the FlinkRunner's SDF implementation. 
AFAIK the runner is unable to do dynamic splitting, which is what you 
are probably looking for. What you describe essentially works in the 
model, but FlinkRunner does not implement the complete contract to make 
use of ability to split a large file to multiple parts and process them 
in parallel. I'm afraid there is no simple solution currently, other 
than what you described. The dynamic splitting might be introduced in 
some future release so that it starts working as you expect out of the 
box. This should affect mostly users with few large files, if you can 
parallelize on the files itself, then it should work fine (which is what 
you observe).


 Jan

On 3/9/22 16:44, Janek Bevendorff wrote:
I went through all Flink and Beam documentation I could find to see if 
I overlooked something, but I could not get the text input source to 
unfuse the file input splits. This creates a huge input bottleneck, 
because one worker is busy reading records from a huge input file 
while 99 others wait for input and I can only shuffle the generated 
records, not the actual file input splits.


To fix it, I wrote a custom PTransform that globs files, optionally 
shuffles the file names, generates fixed-size split markers, shuffles 
the markers, and then reads the lines from these splits. This works 
well, but it feels like a hack, because I think Beam should be doing 
that out of the box. My implementation of the file reader is also much 
simpler and relies on IOBase.readline(), which keeps the code short 
and concise, but also loses a lot of flexibility compared to the Beam 
file reader implementation (such as support for custom line delimiters).


Any other ideas how I can solve this without writing custom PTransforms?

Janek


On 08/03/2022 14:11, Janek Bevendorff wrote:

Hey there,

According to the docs, when using a FileBasedSource or a splittable 
DoFn, the runner is free to initiate splits that can be run in 
parallel. As far as I can tell, the former is actually happening on 
my Apache Flink cluster, but the latter is not. This causes a single 
Taskmanager to process all splits of an input text file. Is this 
known behaviour and how can I fix this?


I have a pipeline that looks like this (Python SDK):

(pipeline
| 'Read Input File' >> textio.ReadFromText(input_glob, 
min_bundle_size=1)

| 'Reshuffle Lines' >> beam.Reshuffle()
| 'Map Records' >> beam.ParDo(map_func))

The input file is a large, uncompressed plaintext file from a shared 
drive containing millions of newline-separated data records. I am 
running this job with a parallelism of 100, but it is bottlenecked by 
a single worker running ReadFromText(). The reshuffling in between 
was added to force Beam/Flink to parallelize the processing, but this 
has no effect on the preceding stage. Only the following map 
operation is being run in parallel. The stage itself is marked as 
having a parallelism of 100, but 99 workers finish immediately.


I had the same issue earlier with another input source, in which I 
match a bunch of WARC file globs and then iterate over them in a 
splittable DoFn. I solved the missing parallelism by adding an 
explicit reshuffle in between matching input globs and actually 
reading the individual files:


class WarcInput(beam.PTransform):
    def expand(self, pcoll):
    return pcoll | MatchFiles(self._file_pattern) | 
beam.Reshuffle() | beam.ParDo(WarcReader())


This way I can at least achieve parallelism on file level. This 
doesn't work with a single splittable input file, of course, for 
which I would have to reshuffle somewhere inside of ReadFromText(). 
Do I really have to write a custom PTransform that generates initial 
splits, shuffles them, and then reads from those splits? I consider 
this somewhat essential functionality.


Any hints appreciated.

Janek



Re: Beam State with the Flink Runner when things go wrong

2022-02-04 Thread Jan Lukavský

+dev 

Hi Cristian,

the savepointPath should not be ignored. We need to verify if local 
environment supports savepoints (I suppose it does) and in that case we 
should use it. In the case it does not we should throw exception as 
silent ignoring of the savepoint is misleading.


Would you file a JIRA? Or possibly create a PR to fix this?

Best,

 Jan

On 2/3/22 07:12, Cristian Constantinescu wrote:

Hi everyone,

I've done some digging within the Beam source code. It looks like when 
the flinkMaster argument is not set, the savepointPath is not used at 
all. [1]


In fact the only time the savepointPath argument is used within all of 
Beam's source code is on lines 183 and 186 of the same file. [2]


Of course, I did all my testing locally on my dev box with the 
embedded Flink cluster that Beam starts, which from the looks of it, 
does NOT use the savepointPath at all.


If someone familiar with the code can confirm this finding, I can 
update the documentation to explicitly state that savepoint resuming 
is not supported locally.


I will do more testing around this with a real Flink cluster and see 
if the behavior is different than the one described in my first email.


Thanks,
Cristian

[1] 
https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
[2] 
https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183


On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu 
 wrote:


Hey Pavel,

Thanks for the quick reply. Pardon me as I cannot copy/paste
straight from the IDE, copying by hand:

KafkaIO.read()
.withBootStrapServer("address")
.withTopic("topic")

.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))

.withConsumerConfigUpdates(map)
.withReadCommitted()
.commitOffsetInFinalize()

.withProcessingTime();


The config map is:
enable.auto.commit -> false
group.id  -> some group
auto.offset.reset -> earliest
specific.avro.reader -> false


On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin
 wrote:

Hello Christian,

Thanks for posting here the detailed scenario of your
experiments. I think it may be important to share your KafkaIO
configuration here too. For example, are you setting this
config anyhow?

https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--

Best Regards,
Pavel Solomin

Tel: +351 962 950 692| Skype: pavel_solomin | Linkedin







On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu
 wrote:

Hi everyone,

I'm trying to figure out how pipeline state works with
Beam running on Flink Classic. Would appreciate some help
with the below.

My understanding is that on recovery (whether from a
checkpoint or savepoint), Flink recreates the operators (I
guess those are DoFns in Beam) with whatever state they
had when the pipeline crashed. For example the Kafka
operator might contain the latest *safe* offset to restart
from. But I'm not seeing this when I introduce exceptions
in the pipeline.

My pipeline is as follows:
1. Read a Kafka topic from start
2. Have a DoFn that stores all incoming messages in a BagState
3. Above DoFn triggers a timer set in such a way that it
triggers after there are a few checkpoints created and
kept because of --externalizeCheckpointsEnabled = true.
This timer handler then outputs the elements to the next
operator, in this case KafkaIo.Write.
4. Before the timer in #3 is executed manually trigger an
exception (listen to another kafka topic, and throw any
time a new message comes in)

What I observe:
1. In #4 above Flink tries to process the exception twice
then stops the pipeline (because numberOfExecutionRetries =2 )
2. After the pipeline is stopped, I see the checkpoints
are kept in the configured directory
3. If I restart the pipeline (with --savepointPath = ):
3a. No messages are read from kafka, because the Kafka
reader reached the end of the topic during the first run
3b. StartBundles are not executed for my DoFn. Indicating
that the DoFn isn't even started
3c. The timer in #3 is never executed, hence there is data
loss as

Re: Flink JobManager getting issue while running a Beam pipeline

2022-01-18 Thread Jan Lukavský

Hi Sujay,

can you please provide more information about how you run the job? 
FlinkRunner is definitely compiled against Flink 1.12.2, but that should 
not be an issue. FlinkRunner contains flink-runtime, you should be fine 
simply excluding it and replacing with the version 1.12.7 (maybe better 
behavior here would be not to include the Flink dependencies into 
FlinkRunner and let the user explicitly provide them). Additionally, 
Flink classes are usually loaded using "parent-first" classloader [1], 
which should mean, that any classes in org.apache.flink.* should be 
loaded from the server classpath. You can verify if you haven't 
accidentally changed the default and that your JM and TM are all at the 
same versions.


Best,
 Jan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-default


On 1/18/22 07:35, sujay kulkarni wrote:

Hi,

I just dug into the code where this issue is getting highlighted, I 
observed 2 different implementations of the class are getting loaded 
in classpath.


Beam-flink runner is bringing the 1.12.2 version of the flink_clients 
which has this class definition. And at runtime Flink Server is 
getting a 1.12.7 version of the same class definition.


1.12.2 method implementation coming from beam-flink-runner

private Builder(String[] programArguments) {
      this.programArguments =
Objects.requireNonNull(programArguments);
    }


1.12.7 method implementation present in FlinkServer

private Builder(String[] programArguments, Configuration
configuration) {
      this.programArguments =
Objects.requireNonNull(programArguments);
      this.configuration =
Objects.requireNonNull(configuration);
    }


But this should not happen right as Beam flinkRunner in version 2.29.0 
supports 1.12.x of Flink version. Is it bug or am I missing something?

If it's a bug any jira is raised against this issue.?

References:
https://mvnrepository.com/artifact/org.apache.beam/beam-runners-flink-1.12/2.29.0 

https://beam.apache.org/documentation/runners/flink/#flink-version-compatibility 



Thanks & Regards,
Sujay

On Mon, Jan 17, 2022 at 11:58 PM Luke Cwik  wrote:

+user  -dev

This looks like you have a dependency conflict issue. Is that the
whole stack trace or is there more?
Also, in my experience Java only emits the full failure reason the
first time this dependency issue exists. Future failures are
effectively cached and any suppressed exceptions and other
failures are typically no longer available to help debug this
issue so finding the first failure can provide more detail.

Finally, https://jlbp.dev/ has great information about Java
dependency management best practices. It will only help slightly
with your immediate problem as it can explain concepts and why
things fail this way but it also provides lots of helpful
information to improve dependency management in general within
your projects.

On Mon, Jan 17, 2022 at 1:44 AM sujay kulkarni
 wrote:


Hi all,

I am trying a sample beam job using flinkRunner.
*Beam-version -- 2.29.0*
*Flink server -- 1.12.7*
When running the job, I am getting below issue in the
jobmanager logs

*Exception in thread "main" java.lang.NoSuchMethodError:

'org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever$Builder*

*org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.newBuilder(java.lang.String[],
org.apache.flink.configuration.Configuration)'*
*        at

org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.getPackagedProgramRetriever(StandaloneApplicationClusterEntryPoint.java:117)*
*        at

org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.getPackagedProgram(StandaloneApplicationClusterEntryPoint.java:103)*
*        at

org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:69)*
*
*

Seems like the issue at classpath. I checked the compatibility
matrix of beam flink runner and in version 2.29.0, Flink
1.12.x is supported,

Please help me with the issue I am facing.

Thanks and Regards,
Sujay


Re: Problem with windowing in python

2022-01-17 Thread Jan Lukavský

Hi Giovanni,

one thing that I overlooked when answering your SF question is that your 
read_records method ignores the provided offset_range_tracker. That 
seems that could be the root of the issues - the FileBasedSource is 
based in splittable DoFn [1], where your logic must cooperate with the 
offset tracker to be able to split and checkpoint reading of the source 
file.


Regarding the GroupIntoBatches, I believe that should be the right 
solution, if your intent is simply to batch the input for optimizing 
some computation.


Hope this helps, please feel free to reach out if you have any more 
questions.


Best,

 Jan

[1] 
https://beam.apache.org/documentation/programming-guide/#splittable-dofns


On 1/17/22 11:12, giovani . wrote:

Hello, could someone help me with this problem:
https://stackoverflow.com/questions/70644351/apache-beam-hanging-on-groupbykey-after-windowing-not-triggering
?

Quickly, I am having problems with python direct runner to aggregate a
data-driven window, simply after using a group by the data is not
outputted.

Maybe I am having some problem understanding beam concepts or it is
something reported with the direct runner, any help will be well
appreciated, thank you very much!


Re: Beam on Flink runner not able to advance watermarks on a high load

2021-11-30 Thread Jan Lukavský

Hi Sandeep,

if you are certain, that the timestamps of the messages in Kafka topic 
you are processing are correctly generated (which probably should be, if 
adding more resources 'fixes' the issue), I suggest you try to create a 
flame graph, or any other form of aggregated view over hotspots in 
thread dump. You might want to as well verify, that your pipeline 
processes records from all partitions of your topic(s), so that no 
partition is accidentally left-behind. But that is all that comes to my 
mind, currently. If you'll have any more insights, do not hesitate to 
share, so that the community can help figure this out, because that 
behavior 'feels' unexpected - at least to me.


Best,

 Jan

On 11/29/21 19:45, Kathula, Sandeep wrote:


Hi Jan,

  We are already using --experiments=use_deprecated_read and 
Flink is not advancing the watermarks.


Thanks,

Sandeep

*From: *Jan Lukavský 
*Date: *Thursday, November 18, 2021 at 4:03 AM
*To: *"Kathula, Sandeep" , 
"user@beam.apache.org" 
*Subject: *Re: Beam on Flink runner not able to advance watermarks on 
a high load


This email is from an external sender.

Hi Sandeep,

one more question, did you try to use 
--experiments=use_deprecated_read? If not, can you try that and check 
if it has any impact on the behavior you observe?


 Jan

On 11/18/21 01:41, Kathula, Sandeep wrote:

Hi Jan,

We are not adding any custom timestamp policy. We also don’t see
backpressure in Flink UI. We are giving 5 task slots each with 3
CPU and 32 GB RAM. Its working if we give 10 task slots each with
3 CPU and 32 GB RAM. But that’s lot of resources for this load. We
are trying to figure out why Beam is not able to handle 10,000
records per second with 5 task slots.

Thanks,

Sandeep

*From: *Jan Lukavský  <mailto:je...@seznam.cz>
*Reply-To: *"user@beam.apache.org" <mailto:user@beam.apache.org>
 <mailto:user@beam.apache.org>
*Date: *Tuesday, November 16, 2021 at 3:11 AM
*To: *"user@beam.apache.org" <mailto:user@beam.apache.org>
 <mailto:user@beam.apache.org>
*Subject: *Re: Beam on Flink runner not able to advance watermarks
on a high load

This email is from an external sender.

Hi Sandeep,

- dev@beam <mailto:d...@beam.apache.org>

The watermark estimation itself should not be related to load. Can
you please clarify, if

 a) you are using any custom timestamp policy?

 b) you see any backpressure in Flink's UI? Backpressure could -
under some circumstances - cause delays in watermark propagation.
It _might_ help to increase parallelism in that case.

Best,

 Jan

On 11/15/21 18:22, Kathula, Sandeep wrote:

Hi,

We are running a Beam application on Flink runner (Beam 2.29
and Flink 1.12) which reads from Kafka and writes to S3  once
every 5 minutes. My window and s3 writes looks like

PCollection.apply("Batch Events",
Window.into(

FixedWindows.of(Duration.standardMinutes(5)))

.triggering(AfterWatermark.pastEndOfWindow())

.withAllowedLateness(Duration.ZERO,
Window.ClosingBehavior.FIRE_ALWAYS)

.discardingFiredPanes())

.apply(FileIO.write()

.via(ParquetIO.sink(schema))

.to(outputPath)

.withNumShards(5)

.withNaming(new CustomFileNaming("snappy.parquet")));

Resources allocated: 5 task slots each with 3 CPU and 32 GB
RAM. We are using RocksDB as state backend and giving 50% of
memory to off-heap.

Its running fine with lighter loads. But when it gets heavier
load from Kafka (7500 or more records per sec – each record
around 7KB in size), we are seeing that no files are being
written to S3.We are using AfterWatermark.pastEndOfWindow()
which is trigerring only when the watermark reaches the end of
window.

After debugging we found that watermarks are not being
advanced during heavy loads and as a result event time
triggers after watermark reaches end of window because of
which s3 writes will happen are not getting triggered. So the
data is accumulating in off-heap which results in out of
memory after some time.

Can you please let us know why watermarks are not advancing
under high load.

Thanks,

Sandeep


Re: Beam on Flink runner not able to advance watermarks on a high load

2021-11-18 Thread Jan Lukavský

Hi Sandeep,

one more question, did you try to use --experiments=use_deprecated_read? 
If not, can you try that and check if it has any impact on the behavior 
you observe?


 Jan

On 11/18/21 01:41, Kathula, Sandeep wrote:


Hi Jan,

We are not adding any custom timestamp policy. We also don’t see 
backpressure in Flink UI. We are giving 5 task slots each with 3 CPU 
and 32 GB RAM. Its working if we give 10 task slots each with 3 CPU 
and 32 GB RAM. But that’s lot of resources for this load. We are 
trying to figure out why Beam is not able to handle 10,000 records per 
second with 5 task slots.


Thanks,

Sandeep

*From: *Jan Lukavský 
*Reply-To: *"user@beam.apache.org" 
*Date: *Tuesday, November 16, 2021 at 3:11 AM
*To: *"user@beam.apache.org" 
*Subject: *Re: Beam on Flink runner not able to advance watermarks on 
a high load


This email is from an external sender.

Hi Sandeep,

- dev@beam <mailto:d...@beam.apache.org>

The watermark estimation itself should not be related to load. Can you 
please clarify, if


 a) you are using any custom timestamp policy?

 b) you see any backpressure in Flink's UI? Backpressure could - under 
some circumstances - cause delays in watermark propagation. It _might_ 
help to increase parallelism in that case.


Best,

 Jan

On 11/15/21 18:22, Kathula, Sandeep wrote:

Hi,

    We are running a Beam application on Flink runner (Beam 2.29
and Flink 1.12) which reads from Kafka and writes to S3  once
every 5 minutes. My window and s3 writes looks like

PCollection.apply("Batch Events",
Window.into(

FixedWindows.of(Duration.standardMinutes(5)))

.triggering(AfterWatermark.pastEndOfWindow())

.withAllowedLateness(Duration.ZERO,
Window.ClosingBehavior.FIRE_ALWAYS)

.discardingFiredPanes())

.apply(FileIO.write()

.via(ParquetIO.sink(schema))

.to(outputPath)

.withNumShards(5)

.withNaming(new CustomFileNaming("snappy.parquet")));

Resources allocated: 5 task slots each with 3 CPU and 32 GB RAM.
We are using RocksDB as state backend and giving 50% of memory to
off-heap.

Its running fine with lighter loads. But when it gets heavier load
from Kafka (7500 or more records per sec – each record around 7KB
in size), we are seeing that no files are being written to S3.We
are using AfterWatermark.pastEndOfWindow() which is trigerring
only when the watermark reaches the end of window.

After debugging we found that watermarks are not being advanced
during heavy loads and as a result event time triggers after
watermark reaches end of window because of which s3 writes will
happen are not getting triggered. So the data is accumulating in
off-heap which results in out of memory after some time.

Can you please let us know why watermarks are not advancing under
high load.

Thanks,

Sandeep


Re: Beam on Flink runner not able to advance watermarks on a high load

2021-11-16 Thread Jan Lukavský

Hi Sandeep,

- dev@beam 

The watermark estimation itself should not be related to load. Can you 
please clarify, if


 a) you are using any custom timestamp policy?

 b) you see any backpressure in Flink's UI? Backpressure could - under 
some circumstances - cause delays in watermark propagation. It _might_ 
help to increase parallelism in that case.


Best,

 Jan

On 11/15/21 18:22, Kathula, Sandeep wrote:


Hi,

    We are running a Beam application on Flink runner (Beam 2.29 and 
Flink 1.12) which reads from Kafka and writes to S3  once every 5 
minutes. My window and s3 writes looks like


PCollection.apply("Batch Events", 
Window.into(


FixedWindows.of(Duration.standardMinutes(5)))

.triggering(AfterWatermark.pastEndOfWindow())

.withAllowedLateness(Duration.ZERO, Window.ClosingBehavior.FIRE_ALWAYS)

.discardingFiredPanes())

.apply(FileIO.write()

.via(ParquetIO.sink(schema))

.to(outputPath)

.withNumShards(5)

.withNaming(new CustomFileNaming("snappy.parquet")));

Resources allocated: 5 task slots each with 3 CPU and 32 GB RAM. We 
are using RocksDB as state backend and giving 50% of memory to off-heap.


Its running fine with lighter loads. But when it gets heavier load 
from Kafka (7500 or more records per sec – each record around 7KB in 
size), we are seeing that no files are being written to S3.We are 
using AfterWatermark.pastEndOfWindow() which is trigerring only when 
the watermark reaches the end of window.


After debugging we found that watermarks are not being advanced during 
heavy loads and as a result event time triggers after watermark 
reaches end of window because of which s3 writes will happen are not 
getting triggered. So the data is accumulating in off-heap which 
results in out of memory after some time.


Can you please let us know why watermarks are not advancing under high 
load.


Thanks,

Sandeep


Re: Stateful processing of session data in order

2021-10-18 Thread Jan Lukavský

Hi Fabian,

you can use (experimental) @RequiresTimeSortedInput [1] annotation for 
that. I believe it should be supported by Dataflow in batch mode 
(because as you noticed Dataflow sorts inputs to stateful DoFn in batch 
by default; maybe someone can correct me if I'm wrong). It is supported 
by DirectRunner, so it should stabilize your tests.


I must emphasize, that the annotation changes the way the stateful DoFn 
handles late vs. droppable data, as described in the javadoc. This 
should make no difference in batch mode (no watermark in batch), but 
attention has to be paid to this when running pipeline in streaming 
mode. Although I'm not sure if this is supported by Dataflow in 
streaming mode (the runner excludes tests for that, so it seems to me, 
that it is not).


 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.33.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html


On 10/15/21 11:05, Fabian Loris wrote:


Hi folks,

In my scenario I’m aggregating data in session windows and then 
windowing it back into the global window to process each key with a 
stateful DoFn. I’m currently doing this in batch, where the order of 
the data is important, so the session windows need to be processed in 
order. I couldn’t find a standard way of doing this, but my tests show 
that the Dataflow runner is correctly processing the data in order, 
but the DirectRunner is not. So there seems to be some difference in 
the behavior of these two runners. I know Apache Beam makes no 
guarantee on order, but it seems the Dataflow runner does?!


Can some provide more details here as I couldn’t find anything? Did I 
discover some undocumented behavior of the Dataflow runner? Would 
there be some alternative approach as I'm doing it right now (see below)?


Although Dataflow is behaving as I want it, I'm still not sure if I 
can/should rely on that. Furthermore, I can't test it in an automated 
way as the DirectRunner is behaving differently.


Below you can find some code snippets. I use Scala with Scio, but I 
believe it is still readable:


pipelineInput

.timestampBy(m => m.timestamp, allowedTimestampSkew = 
Duration.standardMinutes(1)) 
  .withSessionWindows(Duration.minutes(10),..)

.applyKvTransform(WithKeys.of(new SerializableFunction[Model, String] {

private[pipeline] override def apply(input: Model): String = input.key

 }))

.applyKvTransform(GroupByKey.create[String, Model]())

   .applyKvTransform(Window.remerge[KV[String, 
java.lang.Iterable[Model]]]())


  .withGlobalWindow() // back to global window

.applyTransform(ParDo.of(statefulDoFn)) // <- here the session windows 
should be processed in order


Thanks a lot for your help :)
Fabian


Re: [Question] Beam+Python+Flink

2021-10-18 Thread Jan Lukavský

Hi Chiara,

environment_type LOOPBACK is meant for local execution only. The default 
is docker, which is not ideal when you use docker-compose (docker in 
docker), so the other option is to use EXTERNAL environment. With this 
environment, you must manually start the Python SDK harness as a 
separate container using apache/beam_python3.8_sdk docker image with 
args set to '--worker_pool'. That should run a container, that will take 
care of running the Python harness processes. It will by default listen 
on port 5, it must be accessible from the taskmanager container via 
localhost:5, and you then pass it via environment_config, e.g.:


 --environment_type=EXTERNAL --environment_config=localhost:5

That should do the trick. Because of limitations of the 'worker_pool' 
you must make sure, that it is accessible via 'localhost' hostname. For 
more information see [1].


 Jan

[1] https://beam.apache.org/documentation/runtime/sdk-harness-config/

On 10/18/21 11:43, Chiara Troiani wrote:


Hi,


I am trying to follow these tutorials

http://beam.apache.org/documentation/runners/flink/

For the Portable (Python)


I am not able to execute a Beam pipeline on a Flink cluster.

I am running a Flink Session Cluster with docker-compose,


This is my docker-compose file:


——

version: "2.2"

services:

  jobmanager:

    image: flink:1.13.2-scala_2.11

    ports:

      - "8081:8081"

    command: jobmanager

    environment:

      - |

        FLINK_PROPERTIES=

        jobmanager.rpc.address: jobmanager


  taskmanager:

    image: flink:1.13.2-scala_2.11

    depends_on:

      - jobmanager

    command: taskmanager

    scale: 1

    environment:

      - |

        FLINK_PROPERTIES=

        jobmanager.rpc.address: jobmanager

        taskmanager.numberOfTaskSlots: 2


—


I run the examples from a virtual environment, python3.8, 
apache-beam==2.32.0


macOS Catalina 10.15.7

Docker desktop 4.1.1


When I run:

python -m apache_beam.examples.wordcount --input=text.txt 
--output=out.txt --runner=FlinkRunner --flink_master=localhost:8081 
--environment_type=LOOPBACK



I get this error:

/org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy/


/at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)/


/at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)/


/at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)/


/at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)/


/at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)/


/at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)/


/at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)/


/at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)/


/at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)/

/at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)/


/at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)/


/at java.lang.reflect.Method.invoke(Method.java:498)/

/at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)/


/at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)/


/at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)/


/at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)/


/at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)/

/at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)/

/at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)/

/at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)/

/at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)/

/at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)/

/at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)/

/at akka.actor.Actor$class.aroundReceive(Actor.scala:517)/

/at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)/

/at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)/

/at akka.actor.ActorCell.invoke(ActorCell.scala:561)/

/at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)/

/at akka.dispatch.Mailbox.run(Mailbox.scala:225)/

/at akka.dispatch.Mailbox.exec(Mailbox.scala:235)/

/at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)/

/at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)/


/at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979

Re: Performance of Apache Beam

2021-10-18 Thread Jan Lukavský

Hi Azhar,

-dev  +user 

this kind of question cannot be answered in general. The overhead will 
depend on the job and the SDK you use. Using Java SDK with (classical) 
FlinkRunner should give the best performance on Flink, although the 
overhead will not be completely nullified. The way Beam is constructed - 
with portability being one of the main concerns - necessarily brings 
some overhead compared to the job being written and optimized for single 
runner only (using Flink's native API in this case). I'd suggest you 
evaluate the programming model and portability guarantees, that Apache 
Beam gives you instead of pure performance. On the other hand Apache 
Beam tries hard to minimize the overhead, so you should not expect 
*vastly* worse performance. I'd say the best way to go is to implement a 
simplistic Pipeline somewhat representing your use-case and then measure 
the performance on this specific instance.


Regarding fault-tolerance and backpressure, Apache Beam model does not 
handle those (with the exception of bundles being processed as atomic 
units), so these are delegated to the runner - FlinkRunner will 
therefore behave the way Apache Flink defines these concepts.


Hope this helps,

 Jan

On 10/17/21 17:53, azhar mirza wrote:

Hi Team
Could you please let me know following below answers .

I need to know performance of apache beam vs flink if we use flink as 
runner for Beam, what will be the additional overhead converting Beam 
to flink


How fault tolerance and resiliency handled in apache beam.
How apache beam handles backpressure?

Thanks
Azhar

Re: side input terribly slow when using Flink runner

2021-10-12 Thread Jan Lukavský

Hi Stefan,

could you verify what is the coder you use for the PCollection, which 
you materialize as side-input? I'm not sure from the flame-graph itself, 
but could it be SerializableCoder?


 Jan

On 10/12/21 12:23 PM, Stefan Wachter wrote:

Hi,

I have a pipeline where are PCollection is fed as View.asMap into a
later stage. The performance is terribly bad. The attached flame graph
of the stage that uses that side input reveals that nearly all the time
is spent by the Flink runner to serialize pipeline options. Is there
something going terribly wrong?

--Stefan



Re: How can I gracefully stop unbounded KafkaIO consumer?

2021-09-30 Thread Jan Lukavský
Do you set --checkpointingInterval? I have seen similar behavior, but 
only when checkpointing is disabled (missing), see [1].


[1] https://issues.apache.org/jira/browse/BEAM-12053

On 9/30/21 2:14 PM, Marco Costantini wrote:

Thanks Jan,
More than continuing safely, I need to be able to stop the jobs 
safely. Currently, doing so "blows up" the Task Manager. Blows up 
meaning that the exceptions stream so fast that the TaskManager shuts 
down for an unobserved reason : OOM? HDD space?


If I connect to kafka with KafkaIO, then click Cancel Job -> boom (the 
exceptions start streaming in logs)


I've tried 'stopping' the job via the REST API but it gives a response 
like "the module is already in Finished state. Not stopping". It is 
correct in that one of my two pipeline stages is finished but one is 
in RUNNING.


Any tips to clean this mess up?

On Thu, Sep 30, 2021 at 3:30 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Hi Marco,

what is your intention? You want to upgrade the pipeline? Flink uses
checkpoints / savepoints (see [1]), so cancelling pipeline to
savepoint
and then resuming from the savepoint should be safe. Another
option is
to enable offset commit to Kafka via [2]. That way you should be
able to
resume even without savepoint (but you will loose any internal
pipeline
state, so that is mostly useful for stateless pipelines).

Would that work for you?

  Jan

[1]

https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/

<https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/>

[2]

https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--

<https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize-->

On 9/30/21 12:28 AM, Marco Costantini wrote:
> Using a FlinkRunner, if I cancel the job, the pipeline blows up.
> Exceptions stream across my screen rapidly.
>
> ```
> java.lang.InterruptedException: null
>         at
>
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944)
> ~[?:1.8.0_282]
>         at
>

org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584)

>

~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]
> ```
> How can I gracefully stop my Flink+Beam job that uses an unbounded
> KafkaIO source?
>
> If it is not explicitly supported, are there any work-arounds?
>
> Please and thank you,
> Marco.



Re: How can I gracefully stop unbounded KafkaIO consumer?

2021-09-30 Thread Jan Lukavský

Hi Marco,

what is your intention? You want to upgrade the pipeline? Flink uses 
checkpoints / savepoints (see [1]), so cancelling pipeline to savepoint 
and then resuming from the savepoint should be safe. Another option is 
to enable offset commit to Kafka via [2]. That way you should be able to 
resume even without savepoint (but you will loose any internal pipeline 
state, so that is mostly useful for stateless pipelines).


Would that work for you?

 Jan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/


[2] 
https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--


On 9/30/21 12:28 AM, Marco Costantini wrote:
Using a FlinkRunner, if I cancel the job, the pipeline blows up. 
Exceptions stream across my screen rapidly.


```
java.lang.InterruptedException: null
        at 
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944) 
~[?:1.8.0_282]
        at 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584) 
~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]

```
How can I gracefully stop my Flink+Beam job that uses an unbounded 
KafkaIO source?


If it is not explicitly supported, are there any work-arounds?

Please and thank you,
Marco.


Re: Importing dependencies of Python Pipeline

2021-09-25 Thread Jan Lukavský

Hi Robert,

-dev <mailto:d...@beam.apache.org>, as this seems to be really related to 
improper use. Thanks for the pointer (I somehow missed this in the 
docs), I tried --save_main_session, but without luck. When adding the 
flag, the serialization fails with


RecursionError: maximum recursion depth exceeded

My modules do not import one another in cyclic way (if this could cause 
this problem).


If I try to use the "standard" way through setup.py, I still get errors, 
even when I try to import the module in the function (DoFn, actually) as 
described in [1]. It looks like the module is not known, even though it 
is referenced in setup.py (via py_modules). Is there anything that I'm 
still doing wrong?


Thanks,

 Jan

[1] 
https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors


https://cloud.google.com/dataflow/docs/resources/faq#programming_with_the_cloud_dataflow_sdk_for_python

https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors

https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors

https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors

https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors

https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors

On 9/24/21 6:14 PM, Robert Bradshaw wrote:

On Fri, Sep 24, 2021 at 6:33 AM Jan Lukavský  wrote:

+dev

I hit very similar issue even with standard module (math). No matter where I 
put the import statement (even one line preceding the use), the module cannot 
be found and causes

NameError: name 'math' is not defined

This sounds like it was imported in the __main__ module, but
save_main_session was not used.


I therefore think, that the --setup_file works fine, but there is more general 
problem (or misunderstanding from my side) with importing modules. Can this be 
runner-dependent? I use FlinkRunner and submit jobs with 
--flink_submit_uber_jar, could there be the problem?

  Jan

On 9/23/21 3:12 PM, Jan Lukavský wrote:

Oops, sorry, the illustration of the three files is wrong. It was meant to be

src/

  |  script.py

  |  service_pb2.py

  |  service_pb2_grpc.py

The three files are in the same directory.

On 9/23/21 3:08 PM, Jan Lukavský wrote:

Hi,

I'm facing issues importing dependencies of my Python Pipeline. I intend to use 
gRPC to communicate with remote RPC service, hence I have the following project 
structure:

script.py

 | service_pb2.py

 | service_pb2_grpc.py

I created setup.py with something like

setup(name='...',
   version='1.0',
   description='...',
   py_modules=['service_pb2', 'service_pb2_grpc'])


That seems to work, it packages the dependencies, for example by 'python3 
setup.py sdist'. I pass this file to the Pipeline using --setup_file, but I 
have no luck using the module. Though the script is executed, it fails once I 
try to open a channel using (DoFn.setup):

   def setup(self):
 self.channel = grpc.insecure_channel(self.address)
 self.stub = service_pb2_grpc.RpcServiceStub(self.channel)

with exception ModuleNotFoundError: No module named 'service_pb2_grpc'.

Am I doing something obviously wrong?

  Jan



Re: Importing dependencies of Python Pipeline

2021-09-24 Thread Jan Lukavský

+dev <mailto:d...@beam.apache.org>

I hit very similar issue even with standard module (math). No matter 
where I put the import statement (even one line preceding the use), the 
module cannot be found and causes


NameError: name 'math' is not defined

I therefore think, that the --setup_file works fine, but there is more 
general problem (or misunderstanding from my side) with importing 
modules. Can this be runner-dependent? I use FlinkRunner and submit jobs 
with --flink_submit_uber_jar, could there be the problem?


 Jan

On 9/23/21 3:12 PM, Jan Lukavský wrote:
Oops, sorry, the illustration of the three files is wrong. It was 
meant to be


src/

 |  script.py

 |  service_pb2.py

 |  service_pb2_grpc.py

The three files are in the same directory.

On 9/23/21 3:08 PM, Jan Lukavský wrote:

Hi,

I'm facing issues importing dependencies of my Python Pipeline. I 
intend to use gRPC to communicate with remote RPC service, hence I 
have the following project structure:


script.py

    | service_pb2.py

    | service_pb2_grpc.py

I created setup.py with something like

setup(name='...',
  version='1.0',
  description='...',
  py_modules=['service_pb2', 'service_pb2_grpc'])


That seems to work, it packages the dependencies, for example by 
'python3 setup.py sdist'. I pass this file to the Pipeline using 
--setup_file, but I have no luck using the module. Though the script 
is executed, it fails once I try to open a channel using (DoFn.setup):


  def setup(self):
    self.channel = grpc.insecure_channel(self.address)
    self.stub = service_pb2_grpc.RpcServiceStub(self.channel)

with exception ModuleNotFoundError: No module named 'service_pb2_grpc'.

Am I doing something obviously wrong?

 Jan



Re: Importing dependencies of Python Pipeline

2021-09-23 Thread Jan Lukavský
Oops, sorry, the illustration of the three files is wrong. It was meant 
to be


src/

 |  script.py

 |  service_pb2.py

 |  service_pb2_grpc.py

The three files are in the same directory.

On 9/23/21 3:08 PM, Jan Lukavský wrote:

Hi,

I'm facing issues importing dependencies of my Python Pipeline. I 
intend to use gRPC to communicate with remote RPC service, hence I 
have the following project structure:


script.py

    | service_pb2.py

    | service_pb2_grpc.py

I created setup.py with something like

setup(name='...',
  version='1.0',
  description='...',
  py_modules=['service_pb2', 'service_pb2_grpc'])


That seems to work, it packages the dependencies, for example by 
'python3 setup.py sdist'. I pass this file to the Pipeline using 
--setup_file, but I have no luck using the module. Though the script 
is executed, it fails once I try to open a channel using (DoFn.setup):


  def setup(self):
    self.channel = grpc.insecure_channel(self.address)
    self.stub = service_pb2_grpc.RpcServiceStub(self.channel)

with exception ModuleNotFoundError: No module named 'service_pb2_grpc'.

Am I doing something obviously wrong?

 Jan



Importing dependencies of Python Pipeline

2021-09-23 Thread Jan Lukavský

Hi,

I'm facing issues importing dependencies of my Python Pipeline. I intend 
to use gRPC to communicate with remote RPC service, hence I have the 
following project structure:


script.py

    | service_pb2.py

    | service_pb2_grpc.py

I created setup.py with something like

setup(name='...',
  version='1.0',
  description='...',
  py_modules=['service_pb2', 'service_pb2_grpc'])


That seems to work, it packages the dependencies, for example by 
'python3 setup.py sdist'. I pass this file to the Pipeline using 
--setup_file, but I have no luck using the module. Though the script is 
executed, it fails once I try to open a channel using (DoFn.setup):


  def setup(self):
    self.channel = grpc.insecure_channel(self.address)
    self.stub = service_pb2_grpc.RpcServiceStub(self.channel)

with exception ModuleNotFoundError: No module named 'service_pb2_grpc'.

Am I doing something obviously wrong?

 Jan



Re: [Question] How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution

2021-09-22 Thread Jan Lukavský

On 9/22/21 11:42 AM, Wu, Huijun wrote:


Hi Lukavský,

Appreciated for your kind help.

I want to understands more about the method commitOffsetsInFinalize 
and the Checkpoint in Apache Beam.


(1) Regarding commitOffsetsInFinalize

The doc said “Finalized offsets are committed to Kafka.” I want to 
know explicitly when the offset will be committed to Kafka?


For instance if our pipeline has 4 steps:

Read data from Kafka -> transformation on the raw data -> Send data to 
external service by http call -> Persist http response into DB


if I enable commitOffsetsInFinalize, is the offset committed 
immediately after the data is read out from Kafka or it is committed 
immediately after http response are persisted into DB succesffuly?


If at the third step, the external service is down and we fail to make 
the http call, how can we pause the whole streaming process until 
external service recovered? (That leads me trying to find solution to 
manually commit offset back to Kafka)


If we restart the dataflow, will the data got lost which has completed 
step 1 and step 2 but not step 3 yet?


The Apache Beam model uses a concept called "bundles". A bundle is a set 
of records, which are processed atomically - either all elements are 
processed or no elements are processed at all. The 
commitOffsetsInFinalize commits offsets once a bundle is "finalized" - 
that is once is safely persisted. Where it should be persisted is not 
defined by the runner, but is runner-dependent, but what you can rely 
on, is that there should be no data-loss possible.


(2) Regarding checkpoint

I am using Spark as my underlying runner for Apache Beam pipeline. In 
Spark, I can use the below to make checkpoint:


spark.sparkContext.setCheckpointDir(path)

|rdd1.checkpoint()|
||

But in Apache Beam, I never explicitly deal with checkpoint, when and 
how the checkpoint are made behind the scene and how it will affect 
commitOffsetsInFinalize?


As mentioned above, _how_ is the atomicity achieved depends on the 
runner. I'm not SparkRunner expert, so I cannot tell for sure how it 
works in Spark, but the good news is that it should not matter to the 
user code.


 Jan


Best Regards,

*From: *Jan Lukavský 
*Reply-To: *"user@beam.apache.org" 
*Date: *Wednesday, September 22, 2021 at 4:39 PM
*To: *"user@beam.apache.org" 
*Subject: *Re: [Question] How to manually commit Kafka offset in 
Apache Beam at the end of specific doFun execution


External Email

Hi,

are you using KafkaIO? If yes, then you can enable offsets commit in 
bundle finalize via [1]. Note on the other hand, that KafkaIO stores 
offsets in checkpoint, so - provided you run your Beam Pipeline on a 
runner with enabled checkpointing - it should not be necessary to 
commit offsets to Kafka only for the sake of exactly once processing. 
That should be granted even without that.


Please don't hesitate to ask if you have any more questions.

Best,

 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize 
<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.32.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkafka%2FKafkaIO.Read.html%23commitOffsetsInFinalize&data=04%7C01%7Chuiwu%40ebay.com%7C7bfe3ae479cd457afa0508d97da46b2e%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C637678967848814863%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=DUYl9y1nbDeuuByIcQsiemedXOYMOrpNCHtcd8McoKE%3D&reserved=0>--


On 9/22/21 7:46 AM, Wu, Huijun wrote:

Hi All,

I encounter with a problem which seems common, but I couldn’t find
any working solution online like stack overflow or google search,
so I am asking for help here.

I create a simple Apache Beam streaming pipeline which read data
from Kafka, do some processing and persist the result by calling
some external service's API. I want to make sure no data are lost
during pipeline restart or failure so I want to manually commit
the record offset to Kafka after I successfully call the API at
the end of specific doFun execution.

In my previous Kafka experience, I know that by using Kafka
Consumer's below API, I am able to manually commit the record
offset to Kafka.

*consumer.commitSync(currentOffsets); *

There is setting to turn off the auto commit in KafkaIO setup,
however I didn't find any working solution or interfaces exposed
to manually commit offset in Apache Beam as there seems no way I
can access the consumer in doFun. The project I am working on is
using Apache Beam 2.16.0 due to some historical reasons, but I am
happy to upgrade it if latest feature provides working solution.

I will be really appreciated if if some expert can kindly share
some hint with sample codes.

Best Regards,





Re: [Question] How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution

2021-09-22 Thread Jan Lukavský

Hi,

are you using KafkaIO? If yes, then you can enable offsets commit in 
bundle finalize via [1]. Note on the other hand, that KafkaIO stores 
offsets in checkpoint, so - provided you run your Beam Pipeline on a 
runner with enabled checkpointing - it should not be necessary to commit 
offsets to Kafka only for the sake of exactly once processing. That 
should be granted even without that.


Please don't hesitate to ask if you have any more questions.

Best,

 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--


On 9/22/21 7:46 AM, Wu, Huijun wrote:


Hi All,

I encounter with a problem which seems common, but I couldn’t find any 
working solution online like stack overflow or google search, so I am 
asking for help here.


I create a simple Apache Beam streaming pipeline which read data from 
Kafka, do some processing and persist the result by calling some 
external service's API. I want to make sure no data are lost during 
pipeline restart or failure so I want to manually commit the record 
offset to Kafka after I successfully call the API at the end of 
specific doFun execution.


In my previous Kafka experience, I know that by using Kafka Consumer's 
below API, I am able to manually commit the record offset to Kafka.


*consumer.commitSync(currentOffsets); *

There is setting to turn off the auto commit in KafkaIO setup, however 
I didn't find any working solution or interfaces exposed to manually 
commit offset in Apache Beam as there seems no way I can access the 
consumer in doFun. The project I am working on is using Apache Beam 
2.16.0 due to some historical reasons, but I am happy to upgrade it if 
latest feature provides working solution.


I will be really appreciated if if some expert can kindly share some 
hint with sample codes.


Best Regards,



Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-15 Thread Jan Lukavský

Hi Sandeep,

-user@flink <mailto:u...@flink.apache.org>, as this is a Beam related 
question.


Having 25 MB/sec per 5 keys (assuming uniform load) yields ~1.5 GiB per 
5 minute window. Please note that the compression happens _after_ the 
buffering in state, the state is kept uncompressed. Another note is that 
Flink cannot guarantee, that every TaskManager will get exactly one 
shard (assuming you are using parallelism equal to the number of shards, 
which is 5).


On the other hand, 32 GiB off-heap for RocksDB should be more than 
enough. I'm not expert on neither Avro (assuming that is what the 
GenericRecord comes from) nor Parquet, but it is pretty much likely, 
that Parquet uses some extra memory to build the data files.


Can you try:

 a) verifying that you use efficient Coder for the GenericRecord? Maybe 
you can try to switch that for Row and use RowCoder?


 b) swap RocksDB for FsStateBackend and keep all the heap for the 
TaskManager?


 c) increasing number of TaskManagers to something like 10 (although 
that means some of them might be unused, just to try if this cannot help)?


 Jan

On 9/15/21 7:55 PM, Kathula, Sandeep wrote:


Hi Jan,

Thanks for the reply. To answer your questions:

 1. We are using RocksDB as backend.
 2. We are using 10 minutes checkpointing interval.
 3. We are getting 5,000 records per second at max each with size of
around 5KB from Kafka (25 MB/sec) which we are trying to write to
S3. But as we are writing to S3 in parquet format 5 files once for
every 5 minutes, its compressed and we estimate each file size to
be around 100-150 MB in size.

We even tried with 6 pods each with 4 CPU and 64GB of memory (32 GB 
going to off heap for RocksDB) but still not able to write bigger files.


Thanks,

Sandeep

*From: *Jan Lukavský 
*Date: *Tuesday, September 14, 2021 at 10:47 AM
*To: *"user@beam.apache.org" 
*Cc: *user 
*Subject: *Re: Beam with Flink runner - Issues when writing to S3 in 
Parquet Format


This email is from an external sender.

Hi Sandeep,
a few questions:
 a) which state backend do you use for Flink?
 b) what is your checkpointingInterval set for FlinkRunner?
 c) how much data is there in your input Kafka topic(s)?

FileIO has to buffer all elements per window (by default) into state, 
so this might create a high pressure on state backend and/or heap, 
which could result in suboptimal performance. Due to the "connection 
loss" and timeout exceptions you describe I'd suppose there might be a 
lot of GC pressure.


 Jan

On 9/14/21 5:20 PM, Kathula, Sandeep wrote:

Hi,

   We have a simple Beam application which reads from Kafka,
converts to parquet and write to S3 with Flink runner (Beam 2.29
and Flink 1.12). We have a fixed window of 5 minutes after
conversion to PCollection and then writing to S3.
We have around 320 columns in our data. Our intention is to write
large files of size 128MB or more so that it won’t have a small
file problem when reading back from Hive. But from what we
observed it is taking too much memory to write to S3 (giving
memory of 8GB to heap is not enough to write 50 MB files and it is
going OOM). When I increase memory for heap to 32GB then it take
lot of time to write records to s3.

For instance it takes:

20 MB file - 30 sec

50 MB file - 1 min 16 sec

75 MB file - 2 min 15 sec

83 MB file - 2 min 40 sec

Code block to write to S3:

PCollection parquetRecord = …….

parquetRecord.apply(FileIO./write/()
    .via(ParquetIO./sink/(getOutput_schema()))
    .to(outputPath.isEmpty() ? outputPath() : outputPath)
    .withNumShards(5)
    .withNaming(new CustomFileNaming("snappy.parquet")));

We are also getting different exceptions like:

 1. *UserCodeException*:

**

Caused by: org.apache.beam.sdk.util.UserCodeException:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator

at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)

at

com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)

at

org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)

at

org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)

at

org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)

at

org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)

at

org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)

at

org.apache.flink.streaming.runtime.tas

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread Jan Lukavský

Hi Sandeep,
a few questions:
 a) which state backend do you use for Flink?
 b) what is your checkpointingInterval set for FlinkRunner?
 c) how much data is there in your input Kafka topic(s)?

FileIO has to buffer all elements per window (by default) into state, so 
this might create a high pressure on state backend and/or heap, which 
could result in suboptimal performance. Due to the "connection loss" and 
timeout exceptions you describe I'd suppose there might be a lot of GC 
pressure.


 Jan

On 9/14/21 5:20 PM, Kathula, Sandeep wrote:


Hi,

We have a simple Beam application which reads from Kafka, converts to 
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). 
We have a fixed window of 5 minutes after conversion to 
PCollection and then writing to S3. We have around 320 
columns in our data. Our intention is to write large files of size 
128MB or more so that it won’t have a small file problem when reading 
back from Hive. But from what we observed it is taking too much memory 
to write to S3 (giving memory of 8GB to heap is not enough to write 50 
MB files and it is going OOM). When I increase memory for heap to 32GB 
then it take lot of time to write records to s3.


For instance it takes:

20 MB file - 30 sec

50 MB file - 1 min 16 sec

75 MB file - 2 min 15 sec

83 MB file - 2 min 40 sec

Code block to write to S3:

PCollection parquetRecord = …….

parquetRecord.apply(FileIO./write/()
    .via(ParquetIO./sink/(getOutput_schema()))
    .to(outputPath.isEmpty() ? outputPath() : outputPath)
    .withNumShards(5)
    .withNaming(new CustomFileNaming("snappy.parquet")));

We are also getting different exceptions like:

 1. *UserCodeException*:

**

Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator


at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)


at 
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown 
Source)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)


at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)


at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)


at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)


at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)


at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)


at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)


at java.lang.Iterable.forEach(Iterable.java:75)

at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)


at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown 
Source)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)


at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.col

Re: Using Beam to generate unique ids with unbounded sources

2021-09-10 Thread Jan Lukavský

Hi Cristian,

I think that the backlog is not going to be 100% reliable for this 
use-case. A more robust approach would probably be to fetch the 
endOffsets from Kafka when submitting the job (or in appropriate time, 
depending on how do you get updates to the state topic, to make sure 
that you cannot miss something) and then compare these with offset in 
KafkaRecord in getTimestampForRecord. After all partitions reach the end 
offset, that should be the point when you can advance the watermark to 
infinity.


 Jan

On 9/10/21 9:14 AM, Cristian Constantinescu wrote:

Hi Jan and Luke,

Sorry for the late reply.

@Jan
I ended up implementing a timestamp policy like below

   private class AdvanceWatermarkToInfinityAtEndOfTopicTimePolicy 
extends TimestampPolicy {

        protected Instant currentWatermark;

        public 
AdvanceWatermarkToInfinityAtEndOfTopicTimePolicy(Optional 
previousWatermark) {
            currentWatermark = 
previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);

        }

        @Override
        public Instant 
getTimestampForRecord(TimestampPolicy.PartitionContext ctx, 
KafkaRecord record) {

            currentWatermark = new Instant(record.getTimestamp());
            return currentWatermark;
        }

        @Override
        public Instant getWatermark(PartitionContext ctx) {
            if (ctx.getMessageBacklog() == 0) {
                // The reader is caught up. May need to advance the 
watermark.

                return BoundedWindow.TIMESTAMP_MAX_VALUE;

            } // else, there is backlog (or is unknown). Do not 
advance the watermark.

            return currentWatermark;
        }
    }

This seems to work most of the time, but sometimes when using exactly 
once semantics with KafkaIO and Flink ctx.getMessageBacklog() is 
always > 0, so the watermark is not advanced to 
BoundedWindow.TIMESTAMP_MAX_VALUE.


I'll have to find a reliable way to reproduce that though and make a 
sample project.


@Luke
That could be a solution, however I prefer to keep state into kafka 
instead of hashing because there are cases where the id of (A1,B1,C1) 
is the same as the id for (A2, B1, C1) because A1 became A2 after some 
time (like a person legally changed their name, or a financial 
security changed its symbol or cusip, etc).


Thank you both for your suggestions and guidance.

Cheers,
Cristian

On Mon, Aug 9, 2021 at 6:32 PM Luke Cwik <mailto:lc...@google.com>> wrote:


You could look at using a cryptographic hashing function such as
sha512 [1].

You would take the record (A1, B1, C1) encode it and pass it to
the hashing function to generate a binary hash which you could
then convert back to string via an encoding such as hex. The odds
of getting a collision are astronomically small (like you are more
likely to have a random bit flip due to a cosmic ray than for a
collision to happen).

This way you would never need to restore the ids from a previous
run by looking them up from an external source.

1:

https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java

<https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java>

On Thu, Jul 22, 2021 at 7:40 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi Cristian,

I didn't try that, so I'm not 100% sure it would work, but you
probably
could try using custom timestamp policy for the KafkaIO, which
will
shift the timestamp to BoundedWindow.TIMESTAMP_MAX_VALUE, once
you know
you reached head of the state topic. That would probably
require reading
the end offsets before running the Pipeline. This should turn
the source
into bounded source effectively.

  Jan

[1]

https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-

<https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->

On 7/22/21 2:14 PM, Cristian Constantinescu wrote:
> Hi All,
>
> I would like to know if there's a suggested pattern for the
below
> scenario. TL;DR: reading state from Kafka.
>
> I have a scenario where I'm listening to a kafka topic and
generate a
> unique id based on the properties of the incoming item.
Then, I output
> the result to another kafka topic. The tricky part is that
when the
> pipeline is restarted, I have to read the output topic and
build up
> the ids state, this way if I see an item that was already
given an id,
>

Re: [GENERAL QUESTION] How independent are worker nodes

2021-09-07 Thread Jan Lukavský

Hi Ana,

what you describe sounds like logical grouping to me. For example - when 
Beam runs a stateful operation (DoFn), every record has to be associated 
with a _key_. All records with the same key are then processed by the 
same worker. If you have some resources that need to be downloaded 
(cached) from the outside of the Pipeline, one option would be to use a 
stateful DoFn, which would look into its local cache (held in a state) 
and download the required resource if it does not have it (or if it is 
stale). There would probably be needed more logic around freeing the 
state, but I'll leave that out for now.


Would that work for your case?

 Jan

On 9/7/21 7:12 PM, Ana Markovic wrote:

Hi Jan,

Thanks for the fast reply! I came across an example that I wanted to 
recreate in Beam, and I'm sharing the link below. Generally speaking, 
nodes keep their favourite words and accept only jobs that involve 
those favourites. This is a simple example but could be beneficial in 
processing large pieces of data (for example, software repositories), 
where nodes could work on the repositories they already processed (and 
have some files already downloaded) and avoid downloading unnecessary 
repository contents if another node already has them. This could be 
enabled by allowing nodes to check their internal state and decide if 
they want to accept/reject a certain repository as a job. I know that 
the "more complicated" example might be a far fetch, but I wanted to 
give you more context on what I'd want to know about Beam.


Thanks for all the insights!

Best,
Ana

[1] 
https://github.com/crossflowlabs/crossflow/tree/master/org.crossflow.tests/src/org/crossflow/tests/opinionated 
<https://github.com/crossflowlabs/crossflow/tree/master/org.crossflow.tests/src/org/crossflow/tests/opinionated>



On Tue, 7 Sept 2021 at 13:57, Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Hi Ana,

in general, worker nodes do not share any state, and cannot
themselves decide which work to accept and which to reject. How
the work is distributed to downstream processing is defined by a
runner, not the Beam model. On the other hand, what you ask for
might be possibly accomplished using a grouping operation - either
a GroupByKey or a stateful DoFn might help you with that. Can you
further describe your intent?

Best,

 Jan

On 9/7/21 12:32 PM, Ana Markovic wrote:

To whom this may concern,

I've been looking into polyglot data processing frameworks
recently, and I read Beam's documentation as well as developed a
few examples to get some hands-on experience. I've been
wondering, and I haven't found this in the documentation, is
there a way to set up worker nodes so they are "opinionated" or
"smart" in a sense that they can decide for themselves which jobs
they will perform? For example, in a word count example, an
opinionated worker node could only decide to monitor
occurrences of a specific word if it's among the node's favourite
words.

I hope I explained it well, but please let me know if more
details are needed to answer this question.

Thankful in advance,
Ana


--
Best,
Ana


Re: [GENERAL QUESTION] How independent are worker nodes

2021-09-07 Thread Jan Lukavský

Hi Ana,

in general, worker nodes do not share any state, and cannot themselves 
decide which work to accept and which to reject. How the work is 
distributed to downstream processing is defined by a runner, not the 
Beam model. On the other hand, what you ask for might be possibly 
accomplished using a grouping operation - either a GroupByKey or a 
stateful DoFn might help you with that. Can you further describe your 
intent?


Best,

 Jan

On 9/7/21 12:32 PM, Ana Markovic wrote:

To whom this may concern,

I've been looking into polyglot data processing frameworks recently, 
and I read Beam's documentation as well as developed a few examples to 
get some hands-on experience. I've been wondering, and I haven't found 
this in the documentation, is there a way to set up worker nodes so 
they are "opinionated" or "smart" in a sense that they can decide for 
themselves which jobs they will perform? For example, in a word count 
example, an opinionated worker node could only decide to monitor 
occurrences of a specific word if it's among the node's favourite words.


I hope I explained it well, but please let me know if more details are 
needed to answer this question.


Thankful in advance,
Ana


Re: Kafka IO using Python 3.8, Beam 2.31, and Flink 1.13; NoClassDefFoundError HpackDecoder

2021-08-30 Thread Jan Lukavský

Hi Jeremy,

+dev <mailto:d...@beam.apache.org>, as this might be interesting for the 
dev mailing list as well.


Couple of questions:

 a) why do you need specifying default environment to the JobServer? 
That should be done via the PipelineOptions of the SDK that you use for 
building your Pipeline, or is there any other reason for that?


 b) regarding the NoClassDefFound - would --flink_submit_uber_jar flag 
help? See [1]


 c) for [BEAM-12814] - that definitely has value for non-Java runners, 
but when it comes to Flink, wouldn't EMBEDDED environment be preferred? 
That way you would not have to configure anything at all. I'm not sure 
if that works for cluster mode, it works with local JM, I'd _suspect_ 
that it might work for cluster as well. Did you test it and it did not work?


 Jan

[1] 
https://github.com/apache/beam/blob/cbb363f2f01d44dd3f7c063c6cd9d529b5fa9104/sdks/python/apache_beam/runners/portability/flink_runner.py#L51


On 8/28/21 5:52 PM, Jeremy Lewi wrote:
I filed https://issues.apache.org/jira/browse/BEAM-12814 
<https://issues.apache.org/jira/browse/BEAM-12814> to support external 
environments for the JAVA SDK harness to better support K8s.


On Sat, Aug 28, 2021 at 8:52 AM Jeremy Lewi <mailto:jeremy.l...@primer.ai>> wrote:


Hi Folks,

Thank you so much for all your help. I was able to get this
working although I had to hack the python SDK to work around the
issue with connecting to a remote expansion service mentioned in
the other thread

<https://lists.apache.org/x/thread.html/r6c02f6c80d35929a46587ac5d6662ca2e5d8997ae6adfb5902314a35@%3Cuser.beam.apache.org%3E>.

Here's a summary of everything I had to do

  * I built from the 2.33 release branch to pick up the mentioned
fixes
  * To Deal with NoClassDefFoundErrors I ended up baking the Beam
job server Jar into the Flink workers
  o I'm still not quite sure why the Jar isn't being staged
correctly but I'll have to dig into it further
  * I setup my taskmanagers to run docker in docker so they could
use the docker environment for the Java SDK harness
  * I ran the expansion service as a separate service from the job
server so that I could set the options to control the default
environment and pass the use_deprecated_read_flag.


J

On Fri, Aug 27, 2021 at 7:16 PM Jeremy Lewi mailto:jeremy.l...@primer.ai>> wrote:

It looks to me like https://github.com/apache/beam/pull/15082
<https://github.com/apache/beam/pull/15082> added the
ability configure the default environment to the main
entrypoint to the expansion service

but not to the JobServer

https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L260

<https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L260>

Should I file a JIRA for this?

Thanks.
J


On Fri, Aug 27, 2021 at 12:36 PM Jeremy Lewi
mailto:jeremy.l...@primer.ai>> wrote:

Thank you very much for the pointers. I'm working on
getting the code built from 2.33 branch and trying that out.

J

On Thu, Aug 26, 2021 at 6:35 AM Jan Lukavský
mailto:je...@seznam.cz>> wrote:

Hi Jeremy,

the fix for expansion service enables specifying
ExperimentalOptions [1] and PortablePipelineOptions
[2], so you can specify default environment for the
expansion. However ... Java SDK harness does not have
the "work_pool" implementation that python SDK harness
has. So I think that the correct environment for the
expansion would be either DOCKER (which is a pain in
kubernetes) or PROCESS - that requires building custom
flink docker image for TaskManager that includes the
binaries from beam Java SDK image (/opt/apache/beam).

I didn't test if EMBEDDED environment would work as
well, you might try it. That would mean that the
expansion will be completely inlined inside the
TaskManager process.

 Jan

[1]

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java

<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java>

[2]

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/Port

Re: Kafka IO using Python 3.8, Beam 2.31, and Flink 1.13; NoClassDefFoundError HpackDecoder

2021-08-26 Thread Jan Lukavský

Hi Jeremy,

the fix for expansion service enables specifying ExperimentalOptions [1] 
and PortablePipelineOptions [2], so you can specify default environment 
for the expansion. However ... Java SDK harness does not have the 
"work_pool" implementation that python SDK harness has. So I think that 
the correct environment for the expansion would be either DOCKER (which 
is a pain in kubernetes) or PROCESS - that requires building custom 
flink docker image for TaskManager that includes the binaries from beam 
Java SDK image (/opt/apache/beam).


I didn't test if EMBEDDED environment would work as well, you might try 
it. That would mean that the expansion will be completely inlined inside 
the TaskManager process.


 Jan

[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java


[2] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java


On 8/26/21 3:14 PM, Jeremy Lewi wrote:

HI Jan,

That's very helpful. Do you have a timeline for 2.33? Until then I 
will try building from source.


So if I understand correctly. Using 2.33, the solution would be to set 
the use_deprecated_read flag until the issues with SDFs[1]  are fixed?


Does the fix for 3 allow specifying a different environment for 
different languages?  When running in Kubernetes, I think the 
preferred solution is to use two side car containers one running the 
python SDK harness and the other running the java SDK harness. So the 
environment config would need to be different for the two languages.


Thanks
J




J

On Thu, Aug 26, 2021 at 3:14 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Hi Jeremy,

unfortunately, there are several bugs affecting KafkaIO with
Python on FlinkRunner in current releases.

 a) there are some limitations to portable SDF support on Flink [1]

 b) the use_deprecated_read flag cannot be passed to
ExpansionService, that is fixed for upcoming 2.32.0 in [2]

 c) primitive Read transform needed for the use_deprecated_read
flag to work is not working properly until 2.33.0, fix was merged
just yesterday, see [3]

Unfortunately, there are no known workarounds, if you can build
beam from sources, you can try building it from the currently cut
release branch 'release-2.33.0'. It would require to build both
java and python SDKs. The alternative would be to wait for the
release 2.33.0 to come out.

Hope this helps, if you had any more questions, I'd be glad to help.

 Jan

[1] https://issues.apache.org/jira/browse/BEAM-11998
<https://issues.apache.org/jira/browse/BEAM-11998>

[2] https://issues.apache.org/jira/browse/BEAM-12538
<https://issues.apache.org/jira/browse/BEAM-12538>

[3] https://issues.apache.org/jira/browse/BEAM-12704
<https://issues.apache.org/jira/browse/BEAM-12704>

On 8/26/21 2:36 AM, Jeremy Lewi wrote:

Is this the same issue as in this thread
https://lists.apache.org/list.html?d...@beam.apache.org:2021-5
<https://lists.apache.org/list.html?d...@beam.apache.org:2021-5>
about specifying the environment to be used in cross-language
transforms.

Is the problem in the taskmanager or expansion service? Are there
environment variables I can override to force it to use an
external environment so I can use a sidecar for the Java SDK harness?

Thanks
J






On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik mailto:lc...@google.com>> wrote:

It is likely that the expansion service is returning a graph
segment saying you execute KafkaIO within a docker
environment which is what Flink is trying to do.

On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi
mailto:jeremy.l...@primer.ai>> wrote:

Hi Folks,

So I tried putting the beam job server on the Flink
JobManager and Taskmanager containers and setting
classloader.resolve-order

<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order>to
parent-first.
My taskmanager's are now crashing because it looks like
the Kafka IO transform is trying to run docker and it
can't because its running in a K8s pod. Logs attached.

Why would KafkaIO try to launch docker? Does this have
something to do with the expansion service. The docs

<https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines>
 make
it seem like the expansion service only runs at job
submission time and only needs to be accessible from the
machine where you are running your python program to
submit the job.

Thanks
J

   

Re: Kafka IO using Python 3.8, Beam 2.31, and Flink 1.13; NoClassDefFoundError HpackDecoder

2021-08-26 Thread Jan Lukavský

Hi Jeremy,

unfortunately, there are several bugs affecting KafkaIO with Python on 
FlinkRunner in current releases.


 a) there are some limitations to portable SDF support on Flink [1]

 b) the use_deprecated_read flag cannot be passed to ExpansionService, 
that is fixed for upcoming 2.32.0 in [2]


 c) primitive Read transform needed for the use_deprecated_read flag to 
work is not working properly until 2.33.0, fix was merged just 
yesterday, see [3]


Unfortunately, there are no known workarounds, if you can build beam 
from sources, you can try building it from the currently cut release 
branch 'release-2.33.0'. It would require to build both java and python 
SDKs. The alternative would be to wait for the release 2.33.0 to come out.


Hope this helps, if you had any more questions, I'd be glad to help.

 Jan

[1] https://issues.apache.org/jira/browse/BEAM-11998

[2] https://issues.apache.org/jira/browse/BEAM-12538

[3] https://issues.apache.org/jira/browse/BEAM-12704

On 8/26/21 2:36 AM, Jeremy Lewi wrote:

Is this the same issue as in this thread
https://lists.apache.org/list.html?d...@beam.apache.org:2021-5 


about specifying the environment to be used in cross-language transforms.

Is the problem in the taskmanager or expansion service? Are there 
environment variables I can override to force it to use an external 
environment so I can use a sidecar for the Java SDK harness?


Thanks
J






On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik > wrote:


It is likely that the expansion service is returning a graph
segment saying you execute KafkaIO within a docker environment
which is what Flink is trying to do.

On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi mailto:jeremy.l...@primer.ai>> wrote:

Hi Folks,

So I tried putting the beam job server on the Flink JobManager
and Taskmanager containers and setting
classloader.resolve-order

to
parent-first.
My taskmanager's are now crashing because it looks like the
Kafka IO transform is trying to run docker and it can't
because its running in a K8s pod. Logs attached.

Why would KafkaIO try to launch docker? Does this have
something to do with the expansion service. The docs


 make
it seem like the expansion service only runs at job submission
time and only needs to be accessible from the machine where
you are running your python program to submit the job.

Thanks
J

On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi
mailto:jeremy.l...@primer.ai>> wrote:

Hi Luke,

Thanks. I've attached the full stack trace. When I reran
it gave me an error about a different class.

I checked the beam job server jar and as far as I can tell
the classes are present. So seems like a potential issue
with the classpath or staging of JARs on the task managers.

Does anyone happen to know how jars get staged onto Flink
taskmanagers? On the jobmanager I was able to locate the
jar in a /tmp directory but I couldn't figure out how it
was getting staged on taskmanagers.

I tried baking the job server jar into the flink
containers. That gave me an IllegalAccessError. I assume
per the Flink Docs


 this
is indicating a dependency conflict between the system
JARs and the application JARs.

With the portable runner is there anyway to disable
uploading of the JAR and instead rely on the JARs being
baked into the docker container?

Thanks
J

On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik
mailto:lc...@google.com>> wrote:

Both those classes exist in
beam-vendor-grpc-1_36_0-0.1.jar:

lcwik@lcwik:~/Downloads$ jar tf
beam-vendor-grpc-1_36_0-0.1.jar | grep Hpack

org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class

org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class

org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class

org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class

org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.cl

Re: Using Beam to generate unique ids with unbounded sources

2021-07-22 Thread Jan Lukavský

Hi Cristian,

I didn't try that, so I'm not 100% sure it would work, but you probably 
could try using custom timestamp policy for the KafkaIO, which will 
shift the timestamp to BoundedWindow.TIMESTAMP_MAX_VALUE, once you know 
you reached head of the state topic. That would probably require reading 
the end offsets before running the Pipeline. This should turn the source 
into bounded source effectively.


 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-


On 7/22/21 2:14 PM, Cristian Constantinescu wrote:

Hi All,

I would like to know if there's a suggested pattern for the below 
scenario. TL;DR: reading state from Kafka.


I have a scenario where I'm listening to a kafka topic and generate a 
unique id based on the properties of the incoming item. Then, I output 
the result to another kafka topic. The tricky part is that when the 
pipeline is restarted, I have to read the output topic and build up 
the ids state, this way if I see an item that was already given an id, 
I give the same id back and do not generate a new one.


For example:
Input topic -> Output topic
(A1, B1, C1) -> (A1, B1, C1, Random string "ID 1")
(A1, B1, C2) -> (A1, B1, C2, Random string "ID 2")
pipeline is restarted
(A3, B3, C3) -> (A3, B3, C3, Random string "ID 3")
(A1, B1, C1) -> (A1, B1, C1, Random string "ID 1") <-- because we've 
already seen (A1, B1, C1) before


I can't really use any type of windows except the global ones, as I 
need to join on all the items of the output topic (the one with the 
already generated ids).


Right now, I flatten both input and output topics and I use a trigger 
on the global window 
AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.standardSeconds(10) 
then group by properties (A,B,C). Once that is done, I look through 
the grouped rows and see if any one of them has an id already 
generated. If yes, all the other rows get this id and the id is saved 
in the ParDo's state for the future messages. If no, then generate a 
new id.


My solution seems to work. Kind of...

This puts a delay of 10s on all the incoming messages. I'd prefer it 
wouldn't be the case. I would like to read the output topic at the 
start of the pipeline, build the state, then start processing the 
input topic. Since the output topic will be stale until I start 
processing the input topic again, it effectively is a 
bounded collection. Unfortunately because it's kafkaIO, it's still 
considered an unbounded source, which mainly means that Wait.on() this 
collection waits forever. (Note: I've read the notes in the 
documentation [1] but either do not understand them or didn't take the 
appropriate steps for wait.on to trigger properly.)


I have also tried to window the output topic in a session window with 
a one second gap. Basically, if I don't get any item for 1 second, it 
means that I finished reading the output topic and can start 
processing the input topic. Unfortunately Wait.on() doesn't work for 
Session Windows.


Furthermore, I don't think side inputs work for this problem. First 
because I'm not sure how to create the side input from an unbounded 
source. Second because the side input needs to be updated when a new 
id is generated.


I would appreciate any thoughts or ideas to elegantly solve this problem.

Thanks,
Cristian

[1] 
https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html 



Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský

Hi Eddy,

you are probably hitting a not-yet discovered bug in SDF implementation 
in FlinkRunner that (under some currently unknown conditions) seems to 
stop advancing the watermark. This has been observed in one other 
instance (that I'm aware of). I think we don't yet have a tracking JIRA 
for that, would you mind filling it? It would be awesome if you could 
include estimations of messages per sec throughput that causes the issue 
in your case.


+Tobias Kaymak <mailto:tobias.kay...@ricardo.ch>

Tobias, could you please confirm that the case you had with Flink 
stopping progressing watermark resembled this one?


Thanks.

 Jan

On 6/14/21 4:11 PM, Eddy G wrote:

Hi Jan,

I've added --experiments=use_deprecated_read and it seems to work flawlessly 
(with my current Window and the one proposed by Evan).

Why is this? Do Splittable DoFn now break current implementations? Are there 
any posts of possible breaking changes?

On 2021/06/14 13:19:39, Jan Lukavský  wrote:

Hi Eddy,

answers inline.

On 6/14/21 3:05 PM, Eddy G wrote:

Hi Jan,

Thanks for replying so fast!

Regarding your questions,

- "Does your data get buffered in a state?"
Yes, I do have a state within a stage prior ParquetIO writing together with a 
Timer with PROCESSING_TIME.

The stage which contains the state does send bytes to the next one which is the 
ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's not 
clearing the state. This however does work under normal circumstances without 
having too much data queued waiting to be processed.

OK, this suggests, that the watermark is for some reason "stuck". If you
checkpoints enabled, you should see the size of the checkpoint to grow
over time.

- "Do you see watermark being updated in your Flink WebUI?"
The stages that do have a watermark don't get updated. The same watermark value 
has been constant since the pipeline started.

If no lateness is set, any late data should be admitted right?

If no lateness is set, it means allowed lateness of Duration.ZERO, which
means that data that arrive after end-of-window will be dropped.

Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, neither 
in Flink UI or Prometheus. I've seen it in Dataflow but seems to be a Dataflow 
specific metric right?

Should not be Dataflow specific. But if you don't see it, it means it
could be zero. So, we can rule this out.

We're using KinesisIO for reading messages.

Kinesis uses UnboundedSource, which is expended to SDF starting from
Beam 2.25.0. The flag should change that as well. Can you try the
--experiments=use_deprecated_read and see if you Pipeline DAG changes
(should not contain Impulse transform at the beginning) and if it solves
your issues?

On 2021/06/14 12:48:58, Jan Lukavský  wrote:

Hi Eddy,

does your data get buffered in a state - e.g. does the size of the state
grow over time? Do you see watermark being updated in your Flink WebUI?
When a stateful operation (and GroupByKey is a stateful operation) does
not output any data, the first place to look at is if watermark
correctly progresses. If it does not progress, then the input data must
be buffered in state and the size of the state should grow over time. If
it progresses, then it might be the case, that the data is too late
after the watermark (the watermark estimator might need tuning) and the
data gets dropped (note you don't set any allowed lateness, which
_might_ cause issues). You could see if your pipeline drops data in
"droppedDueToLateness" metric. The size of you state would not grow much
in that situation.

Another hint - If you use KafkaIO, try to disable SDF wrapper for it
using "--experiments=use_deprecated_read" on command line (which you
then must pass to PipelineOptionsFactory). There is some suspicion that
SDF wrapper for Kafka might not work as expected in certain situations
with Flink.

Please feel free to share any results,

     Jan

On 6/14/21 1:39 PM, Eddy G wrote:

As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with late 
data (intentionally stopped my consumer so data has been accumulating for 
several days now). Now, with the following Window... I'm using Beam 2.27 and 
Flink 1.12.

   
Window.into(FixedWindows.of(Duration.standardMinutes(10)))

And several parsing stages after, once it's time to write within the ParquetIO 
stage...

   FileIO
   .writeDynamic()
   .by(...)
   .via(...)
   .to(...)
   .withNaming(...)
   .withDestinationCoder(StringUtf8Coder.of())
   .withNumShards(options.getNumShards())

it won't send bytes across all stages

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský

Hi Eddy,

answers inline.

On 6/14/21 3:05 PM, Eddy G wrote:

Hi Jan,

Thanks for replying so fast!

Regarding your questions,

- "Does your data get buffered in a state?"
Yes, I do have a state within a stage prior ParquetIO writing together with a 
Timer with PROCESSING_TIME.

The stage which contains the state does send bytes to the next one which is the 
ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's not 
clearing the state. This however does work under normal circumstances without 
having too much data queued waiting to be processed.
OK, this suggests, that the watermark is for some reason "stuck". If you 
checkpoints enabled, you should see the size of the checkpoint to grow 
over time.


- "Do you see watermark being updated in your Flink WebUI?"
The stages that do have a watermark don't get updated. The same watermark value 
has been constant since the pipeline started.

If no lateness is set, any late data should be admitted right?
If no lateness is set, it means allowed lateness of Duration.ZERO, which 
means that data that arrive after end-of-window will be dropped.


Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, neither 
in Flink UI or Prometheus. I've seen it in Dataflow but seems to be a Dataflow 
specific metric right?
Should not be Dataflow specific. But if you don't see it, it means it 
could be zero. So, we can rule this out.


We're using KinesisIO for reading messages.
Kinesis uses UnboundedSource, which is expended to SDF starting from 
Beam 2.25.0. The flag should change that as well. Can you try the 
--experiments=use_deprecated_read and see if you Pipeline DAG changes 
(should not contain Impulse transform at the beginning) and if it solves 
your issues?


On 2021/06/14 12:48:58, Jan Lukavský  wrote:

Hi Eddy,

does your data get buffered in a state - e.g. does the size of the state
grow over time? Do you see watermark being updated in your Flink WebUI?
When a stateful operation (and GroupByKey is a stateful operation) does
not output any data, the first place to look at is if watermark
correctly progresses. If it does not progress, then the input data must
be buffered in state and the size of the state should grow over time. If
it progresses, then it might be the case, that the data is too late
after the watermark (the watermark estimator might need tuning) and the
data gets dropped (note you don't set any allowed lateness, which
_might_ cause issues). You could see if your pipeline drops data in
"droppedDueToLateness" metric. The size of you state would not grow much
in that situation.

Another hint - If you use KafkaIO, try to disable SDF wrapper for it
using "--experiments=use_deprecated_read" on command line (which you
then must pass to PipelineOptionsFactory). There is some suspicion that
SDF wrapper for Kafka might not work as expected in certain situations
with Flink.

Please feel free to share any results,

    Jan

On 6/14/21 1:39 PM, Eddy G wrote:

As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with late 
data (intentionally stopped my consumer so data has been accumulating for 
several days now). Now, with the following Window... I'm using Beam 2.27 and 
Flink 1.12.

  
Window.into(FixedWindows.of(Duration.standardMinutes(10)))

And several parsing stages after, once it's time to write within the ParquetIO 
stage...

  FileIO
  .writeDynamic()
  .by(...)
  .via(...)
  .to(...)
  .withNaming(...)
  .withDestinationCoder(StringUtf8Coder.of())
  .withNumShards(options.getNumShards())

it won't send bytes across all stages so no data is being written, still it 
accumulates in the first stage seen in the image and won't go further than that.

Any reason why this may be happening? Wrong windowing strategy?


Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský

Hi Eddy,

does your data get buffered in a state - e.g. does the size of the state 
grow over time? Do you see watermark being updated in your Flink WebUI? 
When a stateful operation (and GroupByKey is a stateful operation) does 
not output any data, the first place to look at is if watermark 
correctly progresses. If it does not progress, then the input data must 
be buffered in state and the size of the state should grow over time. If 
it progresses, then it might be the case, that the data is too late 
after the watermark (the watermark estimator might need tuning) and the 
data gets dropped (note you don't set any allowed lateness, which 
_might_ cause issues). You could see if your pipeline drops data in 
"droppedDueToLateness" metric. The size of you state would not grow much 
in that situation.


Another hint - If you use KafkaIO, try to disable SDF wrapper for it 
using "--experiments=use_deprecated_read" on command line (which you 
then must pass to PipelineOptionsFactory). There is some suspicion that 
SDF wrapper for Kafka might not work as expected in certain situations 
with Flink.


Please feel free to share any results,

  Jan

On 6/14/21 1:39 PM, Eddy G wrote:

As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with late 
data (intentionally stopped my consumer so data has been accumulating for 
several days now). Now, with the following Window... I'm using Beam 2.27 and 
Flink 1.12.

 
Window.into(FixedWindows.of(Duration.standardMinutes(10)))

And several parsing stages after, once it's time to write within the ParquetIO 
stage...

 FileIO
 .writeDynamic()
 .by(...)
 .via(...)
 .to(...)
 .withNaming(...)
 .withDestinationCoder(StringUtf8Coder.of())
 .withNumShards(options.getNumShards())

it won't send bytes across all stages so no data is being written, still it 
accumulates in the first stage seen in the image and won't go further than that.

Any reason why this may be happening? Wrong windowing strategy?


Re: [DISCUSS] Drop support for Flink 1.10

2021-05-31 Thread Jan Lukavský

Hi,

+1 to remove the support for 1.10.

 Jan

On 5/28/21 10:00 PM, Ismaël Mejía wrote:

Hello,

With Beam support for Flink 1.13 just merged it is the time to discuss 
the end of
support for Flink 1.10 following the agreed policy on supporting only 
the latest

three Flink releases [1].

I would like to propose that for Beam 2.31.0 we stop supporting Flink 
1.10 [2].
I prepared a PR for this [3] but of course I wanted to bring the 
subject here
(and to user@) for your attention and in case someone has a different 
opinion or

reason to still support the older version.

WDYT?

Regards,
Ismael

[1] 
https://lists.apache.org/thread.html/rfb5ac9d889d0e3f4400471de3c25000a15352bde879622c899d97581%40%3Cdev.beam.apache.org%3E 

[2] https://issues.apache.org/jira/browse/BEAM-12281 

[3] https://github.com/apache/beam/pull/14906 





Re: KafkaIO with DirectRunner is creating tons of connections to Kafka Brokers

2021-05-24 Thread Jan Lukavský
:53:50.990 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.018 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.020 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.046 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.048 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.077 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.083 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.156 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.167 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.226 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
[INFO ] 2021-05-24 09:53:51.232 [direct-runner-worker] 
apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0






On 24 May 2021 at 09:35:50, Jan Lukavský (je...@seznam.cz 
<mailto:je...@seznam.cz>) wrote:



Hi Serge,

I posted answer to the SO question, hope that helps. One question - a 
frequent creation of consumers should be expected with DirectRunner, 
but there should be only a limited number of them at a time. Do you 
see many of them present simultaneously? Or are they correctly closed 
and released?


 Jan

On 5/23/21 8:40 AM, Sozonoff Serge wrote:

Hi,

I would like to refer to the following Stackoverflow issue I found.

https://stackoverflow.com/questions/56496611/apache-beam-kafka-consumer-restarted-over-and-over-again#new-answer

I have the very same issue when developing my Pipeline. Originally 
the pipeline was bound and would read and process a CSV where the 
name came from a parameter. Following some reading up on various 
patterns for being able to process a new incoming file automatically 
I added a KafkaIO read to the front of my pipeline to listen for 
messages which contain the name of a file to be processed and then I 
pass this on to FileIO etc …. As such my pipeline is now unbound.


My pipeline fails using DirectRunner once we have reached the 
maximum number of open files!! Looking at the logging I see a very 
large number of threads (consumers) which seem to be connecting to 
the Kafka broker which makes no sense. I have a topic with a single 
partition!



So literally 100’s of these. Notice the pool thread numbers and 
Consumer client id's


[INFO ] 2021-05-23 08:37:56.511 [pool-82-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_1203867505_report-processor--75, 
groupId=Reader-0_offset_consumer_1203867505_report-processor-] 
Seeking to LATEST offset of partition my_topic-0

…..
[INFO ] 2021-05-23 08:37:56.516 [pool-134-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_1204976634_report-processor—127, 
groupId=Reader-0_offset_consumer_1204976634_report-processor-] 
Seeking to LATEST offset of partition my_topic-0

….
[INFO ] 2021-05-23 08:37:56.517 [pool-48-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_283343789_report-processor--41, 
groupId=Reader-0_offset_consumer_283343789_report-processor-] 
Seeking to LATEST offset of partition my_topic-0

….
etc ….


So my issue resembles the one which is described in the 
Stackoverflow and I can confirm that switching to a Flink runner 
resolves the problem but surely there is an explanation ? Is there a 
know bug with Direct Runner ?


Kind thanks,
Serge




Re: KafkaIO with DirectRunner is creating tons of connections to Kafka Brokers

2021-05-24 Thread Jan Lukavský

Hi Serge,

I posted answer to the SO question, hope that helps. One question - a 
frequent creation of consumers should be expected with DirectRunner, but 
there should be only a limited number of them at a time. Do you see many 
of them present simultaneously? Or are they correctly closed and released?


 Jan

On 5/23/21 8:40 AM, Sozonoff Serge wrote:

Hi,

I would like to refer to the following Stackoverflow issue I found.

https://stackoverflow.com/questions/56496611/apache-beam-kafka-consumer-restarted-over-and-over-again#new-answer

I have the very same issue when developing my Pipeline. Originally the 
pipeline was bound and would read and process a CSV where the name 
came from a parameter. Following some reading up on various patterns 
for being able to process a new incoming file automatically I added a 
KafkaIO read to the front of my pipeline to listen for messages which 
contain the name of a file to be processed and then I pass this on to 
FileIO etc …. As such my pipeline is now unbound.


My pipeline fails using DirectRunner once we have reached the maximum 
number of open files!! Looking at the logging I see a very large 
number of threads (consumers) which seem to be connecting to the Kafka 
broker which makes no sense. I have a topic with a single partition!



So literally 100’s of these. Notice the pool thread numbers and 
Consumer client id's


[INFO ] 2021-05-23 08:37:56.511 [pool-82-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_1203867505_report-processor--75, 
groupId=Reader-0_offset_consumer_1203867505_report-processor-] Seeking 
to LATEST offset of partition my_topic-0

…..
[INFO ] 2021-05-23 08:37:56.516 [pool-134-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_1204976634_report-processor—127, 
groupId=Reader-0_offset_consumer_1204976634_report-processor-] Seeking 
to LATEST offset of partition my_topic-0

….
[INFO ] 2021-05-23 08:37:56.517 [pool-48-thread-1] 
kafka.clients.consumer.internals.SubscriptionState - [Consumer 
clientId=consumer-Reader-0_offset_consumer_283343789_report-processor--41, 
groupId=Reader-0_offset_consumer_283343789_report-processor-] Seeking 
to LATEST offset of partition my_topic-0

….
etc ….


So my issue resembles the one which is described in the Stackoverflow 
and I can confirm that switching to a Flink runner resolves the 
problem but surely there is an explanation ? Is there a know bug with 
Direct Runner ?


Kind thanks,
Serge




Re: DirectRunner, Fusion, and Triggers

2021-05-17 Thread Jan Lukavský

On 5/17/21 3:46 PM, Bashir Sadjad wrote:

Thanks Jan. Two points:

- I was running all the experiments I reported with 
`--targetParallelism=1` to make sure concurrent threads do not mess up 
the logs.
I think that is what causes what you see. Try to increase the 
parallelism to number higher than number of input bundles.
- I have been tracking bundles too (see @StartBundle log messages in 
the mini-example in my previous reply to Kenn).


I see the code, but not the log output. My suspicion would be, that you 
see "Start bundle" -> "Debug Input" OR "Debug NEXT", right? If yes, than 
this is expected - processing of a bundle produces "output bundle", 
which is queued into work queue and is then processed as soon as there 
is free worker to work on it. Fetching new outputs produces new bundles, 
which are also queued to this queue, which is what causes the interleave.




So I don't think bundles alone describe what I see. In the 
mini-example, processing of INPUT bundles and NEXT bundles are 
interleaved, e.g., 3 INPUT bundles are processed, then the output of 
those go through NEXT, then a few other INPUT bundles and so on.


Now, if we go back to my original example with S1->S2A->GBK->S2B->S3, 
the input to S2B also has many bundles. However in this case /all/ of 
those bundles are processed first, then they all go through the next 
stages, e.g., the logging S2B' that I mentioned. So there is no 
interleaving of log messages.
GBK is a stateful operation that has to wait for a trigger - in simple 
batch case the trigger is the end of input, which is why you cannot see 
outputs of GBK being interleaved with reading inputs. All inputs have 
had to be read before GBK can proceed and output any bundle downstream.


Regards,

-B

On Mon, May 17, 2021 at 3:50 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Hi Bashir,

the behavior you describe should be expected. DirectRunner splits
the input work into bundles, processing each bundle might result
in zero, one or more new bundles. The executor executes the work
associated with these bundles, enqueuing new bundles into a queue,
until there are no unprocessed bundles left in the queue (that is,
the work has been completely done). It uses a fixed-size thread
pool to consume and execute work associated with these bundles
(the size of which is defined by --targetParallelism), so what
happens is that the processing of bundles of "Sleep" transform and
"Next" transform are interleaved, but not due to fusion, but due
to limited parallelism. If you increase the parallelism beyond the
total number of bundles in your `lines` PCollection, then I think
you would see the result you expect.

Best,

 Jan

On 5/12/21 7:35 PM, Bashir Sadjad wrote:

Thanks Kenn.

On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles mailto:k...@apache.org>> wrote:


On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad
mailto:bas...@google.com>> wrote:

However, if I add a dummy S2' after S2 (i.e.,
S1->S2->S2'->S3) which only prints some log messages for
each record and passes the record to output, then it
seems S2 and S2' are fused. Because the log messages are
interleaved with fetches.


*Q1*: Does DirectRunner do any fusion optimization (e.g.,
like DataflowRunner)? If not by default, is there any way
to enable it?


The Java DirectRunner does not do any fusion optimization.
There's no code to enable :-). It should affect performance
only, not semantics. The DirectRunner is known to have poor
performance, but mostly no one is working on speeding it up
because it is really just for small-scale testing.


Here is a minimal pipeline (with no windowing) that demonstrates
what I mean; maybe I am using the wrong terminology but when I
run this pipeline with DirectRunner (and with
`--targetParallelism=1`) the `DEBUG INPUT` and `DEBUG NEXT`
messages are interleaved. While if there was no fusion, I would
have expected to see all `DEBUG INPUT` messages first and then
all of `DEBUG NEXT`:

Pipeline pipeline = Pipeline.create(options);
PCollection lines =
pipeline.apply(TextIO.read().from(options.getInputFile()));

PCollection linesDelayed = lines.apply("Sleep",
ParDo.of(new DoFn() {
  @StartBundle
  public void startBundle() {
log.info <http://log.info>("INPUT: Started a new bundle");
  }
  @ProcessElement
  public void ProcessElement(@Element String line,
OutputReceiver out) throws InterruptedException {
log.info <http://log.info>(String.format("DEBUG INPUT %s", line));
    Thread.sleep(3000);
    out.output(line);
  }

Re: DirectRunner, Fusion, and Triggers

2021-05-17 Thread Jan Lukavský

Hi Bashir,

the behavior you describe should be expected. DirectRunner splits the 
input work into bundles, processing each bundle might result in zero, 
one or more new bundles. The executor executes the work associated with 
these bundles, enqueuing new bundles into a queue, until there are no 
unprocessed bundles left in the queue (that is, the work has been 
completely done). It uses a fixed-size thread pool to consume and 
execute work associated with these bundles (the size of which is defined 
by --targetParallelism), so what happens is that the processing of 
bundles of "Sleep" transform and "Next" transform are interleaved, but 
not due to fusion, but due to limited parallelism. If you increase the 
parallelism beyond the total number of bundles in your `lines` 
PCollection, then I think you would see the result you expect.


Best,

 Jan

On 5/12/21 7:35 PM, Bashir Sadjad wrote:

Thanks Kenn.

On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles > wrote:



On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad mailto:bas...@google.com>> wrote:

However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3)
which only prints some log messages for each record and passes
the record to output, then it seems S2 and S2' are fused.
Because the log messages are interleaved with fetches.


*Q1*: Does DirectRunner do any fusion optimization (e.g., like
DataflowRunner)? If not by default, is there any way to enable it?


The Java DirectRunner does not do any fusion optimization. There's
no code to enable :-). It should affect performance only, not
semantics. The DirectRunner is known to have poor performance, but
mostly no one is working on speeding it up because it is really
just for small-scale testing.


Here is a minimal pipeline (with no windowing) that demonstrates what 
I mean; maybe I am using the wrong terminology but when I run this 
pipeline with DirectRunner (and with `--targetParallelism=1`) the 
`DEBUG INPUT` and `DEBUG NEXT` messages are interleaved. While if 
there was no fusion, I would have expected to see all `DEBUG INPUT` 
messages first and then all of `DEBUG NEXT`:


Pipeline pipeline = Pipeline.create(options);
PCollection lines = 
pipeline.apply(TextIO.read().from(options.getInputFile()));


PCollection linesDelayed = lines.apply("Sleep", ParDo.of(new 
DoFn() {

  @StartBundle
  public void startBundle() {
log.info ("INPUT: Started a new bundle");
  }
  @ProcessElement
  public void ProcessElement(@Element String line, 
OutputReceiver out) throws InterruptedException {

log.info (String.format("DEBUG INPUT %s", line));
    Thread.sleep(3000);
    out.output(line);
  }
}));

PCollection linesDebug = linesDelayed.apply("Next", 
ParDo.of(new DoFn() {

  @StartBundle
  public void startBundle() {
log.info ("NEXT: Started a new bundle");
  }
  @ProcessElement
  public void ProcessElement(@Element String line, 
OutputReceiver out) {

log.info (String.format("DEBUG NEXT %s", line));
    out.output(line);
  }
}));

linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1));

PipelineResult result = pipeline.run();
result.waitUntilFinish();

It seems that a few bundles are processed by `Sleep` transform then 
they all go through `Next`. Again a few more bundles go through 
`Sleep` then `Next` and so on.


The other issue is with triggers and creating panes. I have an
extended version of this pipeline where a simplified view of
it is: S1->S2A->GBK->S2B->S3

S1: Like before
S2A: Add a key to the output of S1
GBK: Groups output of S2A to remove duplicate keys
S2B: Similar to S2 above, i.e., fetch deduped URLs and create
Avro records
S3: Same as before

*Q2*: In this case, if I add a dummy S2B' after S2', the log
messages are /not/ interleaved with resource fetches, i.e., no
fusion is happening. Why? What is different here?


I don't quite understand what the problem is here.


The same log message interleaving does not happen in this case. So 
back to my original example sketch, log messages of S2' are 
interleaved with S2 (which I thought is because of fusion) but all of 
the log messages of S2B' are printed after all messages of S2B.


*Q3*: Even if I add a similar trigger to the output of S2B,
the Parquet file generation does not start until all of the
fetches are done. Again, what is different here and why
intermediate panes are not fired while the output of S2B is
being generated?


I think it would help to see how you have configured the ParquetIO
write transform.


I think this is related to the difference between the behaviour of the 
two examples above (i.e., S2' vs. S2B'). If it turns out that is not 
the case, I will create a minimal example including ParquetIO too.


Tha

Re: Getting null pointer exception in a basic setup, don't know why

2021-05-06 Thread Jan Lukavský

Hi Teodor,

can you share (maybe github link, if you have it in public repo) the 
implementation of CountSource and Printer? What changed in Beam 2.25.0 
(if I recall correctly) is how Read transform is translated. It uses SDF 
now, so there might be something that was broken before, but the change 
of the translation revealed the problem. You can check if your problem 
disappears if you add use_deprecated_read into experiments of the 
PipelineOptions. See Highlights in [1].


 Jan

[1] https://beam.apache.org/blog/beam-2.25.0/

On 5/6/21 9:28 AM, Teodor Spæren wrote:

Hey!

I'm having problems with a program that I used to be able to run just 
fine with flink, but now I'm getting a null pointer exception.


The beam program in question looks like this:


package no.spaeren.thesis.benchmarks.beam;

import no.spaeren.thesis.benchmarks.beam.helpers.CountSource;
import no.spaeren.thesis.benchmarks.beam.helpers.Printer;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import picocli.CommandLine;

import java.util.concurrent.Callable;

@CommandLine.Command(name = "BeamSimple", mixinStandardHelpOptions = 
true, description = "A simple beam job")

public class BeamSimple implements Callable {

    @CommandLine.Option(names = {"--from"}, defaultValue = "0")
    final Long from = 0L;

    @CommandLine.Option(names = {"--to"}, defaultValue = "1000")
    final Long to = 1000L;


    @Override
    public Void call() {
    FlinkPipelineOptions options = 
PipelineOptionsFactory.create().as(FlinkPipelineOptions.class);

    options.setDisableMetrics(true);
    options.setRunner(FlinkRunner.class);
    // options.setShutdownSourcesAfterIdleMs(100L);
    // options.setParallelism(2);
    Pipeline p = Pipeline.create(options);


    //final PCollection ds = 
p.apply(GenerateSequence.from(this.from).to(this.to));
    final PCollection ds = p.apply(Read.from(new 
CountSource(this.from, this.to)));
    final PCollection db = 
ds.apply(MapElements.into(TypeDescriptors.longs()).via((Long x) -> x 
* 2));


    db.apply(ParDo.of(new Printer<>("BeamSimple: %d\n")));


    p.run().waitUntilFinish();

    return null;
    }
}


I'm using Beam version 2.27.0, but it also happens on 2.29.0. The 
Flink version is 1.12.1.


And the error message I'm getting is:


dragon % JAVA_HOME="/usr/lib/jvm/java-8-openjdk" 
~/madsci/thesis/world/tools/flink/bin/flink info -p 1 
~/madsci/thesis/codes/mezurilo/target/mezurilo-bundled-1.0-SNAPSHOT.jar 
BeamSimple
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on 
-Dswing.aatext=true



 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The 
program plan could not be fetched - the program aborted pre-maturely.


Classpath: 
[file:/home/rhermes/madsci/thesis/codes/mezurilo/target/mezurilo-bundled-1.0-SNAPSHOT.jar]


System.out: (none)

System.err: java.lang.NullPointerException
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SingletonImmutableList.(SingletonImmutableList.java:38)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.of(ImmutableList.java:94)
at 
org.apache.beam.sdk.coders.NullableCoder.getCoderArguments(NullableCoder.java:102)
at 
org.apache.beam.sdk.coders.StructuredCoder.getComponents(StructuredCoder.java:49)
at 
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)

at java.util.AbstractList.hashCode(AbstractList.java:541)
at 
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)

at java.util.AbstractList.hashCode(AbstractList.java:541)
at 
org.apache.beam.sdk.coders.StructuredCoder.hashCode(StructuredCoder.java:69)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Hashing.smearedHash(Hashing.java:54)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.get(HashBiMap.java:254)
at 
org.apache.beam.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:265)
at 
org.apache.beam.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:231)
at 
org.apache.beam.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:210)
at 
org.apache.beam.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:176)
at 
org.apache.be

Re: Beam 2.28.0 objectReuse and fasterCopy for FlinkPipelineOption

2021-04-12 Thread Jan Lukavský

Hi Eleanore,

that's a good question. :)

There has been discussion about this in the PR of the mentioned Jira 
[1]. Generally, objectReuse enables Flink to hypothetically reuse 
instances of deserialized objects instead of creating new ones. But due 
to how Beam defines Coders it creates new instances nevertheless. See 
the discussion for details.


 Jan

[1] https://github.com/apache/beam/pull/13240#issuecomment-721635620

On 4/12/21 7:29 PM, Eleanore Jin wrote:

Hi Jan,

Thanks a lot for the reply! This helps, I wonder if you have any idea 
whats the difference between fasterCopy vs objectReuse option?


Eleanore

On Fri, Apr 9, 2021 at 11:53 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Hi Eleanore,

the --fasterCopy option disables clone between operators (see
[1]). It
should be safe to use it, unless your pipeline outputs an object and
later modifies the same instance. This is generally not supported
by the
Beam model and is considered to be an user error. FlinkRunner
historically chose a way of "better-safe-than-sorry" approach and
explicitly cloned every received object between (non-shuffle)
operators.
Enabling this option should increase performance, you can verify your
Pipeline is not doing any disallowed mutations using DirectRunner,
which
checks this by default (without --enforceImmutability=false).

  Jan

[1] https://issues.apache.org/jira/browse/BEAM-11146
<https://issues.apache.org/jira/browse/BEAM-11146>

On 4/9/21 7:57 AM, Eleanore Jin wrote:
> Hi community,
>
> I am upgrading from Beam 2.23.0 -> 2.28.0, and a new
> FlinkPipelineOption is introduced: fasterCopy.
>
> Can you please help me understand what is the difference between
the
> option objectReuse vs fasterCopy?
>
> Thanks a lot!
> Eleanore



Re: Beam 2.28.0 objectReuse and fasterCopy for FlinkPipelineOption

2021-04-09 Thread Jan Lukavský

Hi Eleanore,

the --fasterCopy option disables clone between operators (see [1]). It 
should be safe to use it, unless your pipeline outputs an object and 
later modifies the same instance. This is generally not supported by the 
Beam model and is considered to be an user error. FlinkRunner 
historically chose a way of "better-safe-than-sorry" approach and 
explicitly cloned every received object between (non-shuffle) operators. 
Enabling this option should increase performance, you can verify your 
Pipeline is not doing any disallowed mutations using DirectRunner, which 
checks this by default (without --enforceImmutability=false).


 Jan

[1] https://issues.apache.org/jira/browse/BEAM-11146

On 4/9/21 7:57 AM, Eleanore Jin wrote:

Hi community,

I am upgrading from Beam 2.23.0 -> 2.28.0, and a new 
FlinkPipelineOption is introduced: fasterCopy.


Can you please help me understand what is the difference between the 
option objectReuse vs fasterCopy?


Thanks a lot!
Eleanore


Re: General guidance

2021-03-29 Thread Jan Lukavský

Hi Julius,

which version of Beam do you run? There has been a fix for 2.25.0 [1] 
which could address what you see.


 Jan

[1] https://issues.apache.org/jira/browse/BEAM-10760

On 3/25/21 8:11 PM, Kenneth Knowles wrote:
This is a Beam issue indeed, though it is an issue with the 
FlinkRunner. So I think I will BCC the Flink list.


You may be in one of the following situations:
 - These timers should not be viewed as distinct by the runner, but 
deduped, per 
https://issues.apache.org/jira/browse/BEAM-8212#comment-16946013 

 - There is a different problem if you have an unbounded key space 
with windows that never expire, since then there are unbounded numbers 
of truly distinct (but irrelevant) timers. That is also the 
responsibility of the runner to simply not set timers that can never fire.


Kenn

On Thu, Mar 25, 2021 at 11:57 AM Almeida, Julius 
mailto:julius_alme...@intuit.com>> wrote:


Hi Team,

My streaming pipeline is based on beam & running using flink
runner with rocksdb as state backend.

Over time I am  seeing memory spike & after giving a look at heap
dump, I am seeing records in  ‘__StatefulParDoGcTimerId’ which
seems to be never cleaned.

Found this jira https://issues.apache.org/jira/browse/BEAM-8212
 describing the
issue I believe I am facing.

Any pointers would be helpful in identifying possible solution.

Thanks,

Julius



Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Jan Lukavský
I think there still could be problems in some corner cases. The problem 
is, that elements considered 'late' in timestamp combiner have different 
definition than what is marked as late in PaneInfo. So you can have a 
corner case, when PaneInfo would on ON_TIME, but the timestamp would 
still be shifted to end of window. This would probably not be too often, 
but it can happen. If it is fine for your use case, then this could work.


Jan

On 1/13/21 3:59 PM, Raman Gupta wrote:
Hmm, I think I've found a simple solution... adding this to the 
beginning of my looping timer @ProcessElement function:


// late elements don't need to affect our looping timer, // pass them 
through without modification // this is kind of a work-around for 
https://issues.apache.org/jira/browse/BEAM-2262 // but I think makes 
sense in general for looping timers when // there is no need to 
trigger timers after the window is done if (paneInfo.timing == PaneInfo.Timing.LATE) {

   receiver.output(element)
   return }
At least all my unit tests are passing... is there any problem with 
this approach?


Thanks,
Raman


On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta <mailto:rocketra...@gmail.com>> wrote:


(Replying to Reza) Yes, I am using TestStream for my unit test.
Other replies below.

On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi,

yes, there is a possible non-determinism, that is related to
the timestamp combiner. Timestamp combiners combine only
elements, that are not 'late' ([1]), meaning that their
timestamp is not preceding output watermark of the GBK.
Looking at the pipeline code I suppose that could be the cause.


Yes, the test stream in this test case does indeed send the
element in question "late". Here is the setup:

val base = Instant.EPOCH + 6.hours
val xStream: TestStream = TestStream.create(coder)
  .addElements(x["1"]) // this just initializes the looping timer
  // advance watermark past end of window that would normally
process x2
  .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda())
  .addElements(x["2"]) // now we see the element
  .advanceWatermarkToInfinity()

Here late element x["2"] has a timestamp of
1970-01-01T07:30:00.000Z and the watermark at the time x["2"] is
added is 1970-01-01T09:00:01.000Z.

So I get your point that the timestamp combiner is not used for
late elements, but if late elements are singly emitted as in this
pipeline, why do any timestamp modification at all? I would expect
them to arrive with their original timestamp, not one changed
from 1970-01-01T07:30:00.000Z to 1970-01-01T07:34:59.999Z (this is
the part that seems non-deterministic). What is the logic / reason
behind the pipeline setting this element's timestamp
to 1970-01-01T07:34:59.999Z?

You can make the pipeline deterministic by using
TimestampCombiner.END_OF_WINDOW (default).


It's definitely not ideal for this use case, but I'll consider it.

If you *need* to use the TimestampCombiner.EARLIEST, you can
probably do that by tweaking the looping timer stateful dofn
and fix timestamps there (using timer output timestamp).


I had already tried that but the pipeline throws an error that the
timestamp emitted cannot be earlier than the current element
timestamp.

Thanks,
Raman

  Jan

[1] https://issues.apache.org/jira/browse/BEAM-2262

On 1/12/21 5:26 PM, Raman Gupta wrote:

Your reply made me realize I removed the condition from my
local copy of the looping timer that brings the timer forward
if it encounters an earlier element later in the stream:

|if (currentTimerValue == null ||
currentTimerValue**>**nextTimerTimeBasedOnCurrentElement.getMillis())
{|

Restoring that condition fixes that issue.

However, the reason I removed that condition in the first
place was because it was making a unit test non-deterministic
-- sometimes the element timestamps into the looping timer
didn't seem to match the element timestamps according to the
EARLIEST timestamp combiner defined, causing the timer to
execute an additional time.

The pipeline:

input
   // withAllowedTimestampSkew is deprecated, but as of now,
there is no replacement //
https://issues.apache.org/jira/browse/BEAM-644 .apply("XTimestamps", 
WithTimestamps
 .of{ it.enteredAt.asJoda()} 
.withAllowedTimestampSkew(Duration.INFINITE.asJoda())
   )
   .apply("FixedTickWindows", 
Window.into(FixedWindows.of(5.minutes.asJoda()))
   .triggering(
 AfterWatermark

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Jan Lukavský
> What is the logic / reason behind the pipeline setting this element's 
timestamp to 1970-01-01T07:34:59.999Z?


There are reasons (which I cannot recall right now :)) why late elements 
should not be simply added to the combiner. If there are only late 
elements in pane, the combiner actually gets no elements and shifts the 
timestamp to end of window (which is why TimestampCombiner.END_OF_WINDOW 
works well, because it does that for all elements, regardless if late or 
not).


> I had already tried that but the pipeline throws an error that the 
timestamp emitted cannot be earlier than the current element timestamp.


Ah, right. The reason for that is you cannot set output timestamp that 
would precede current output watermark (which is unknown to user code). 
In that case, the solution could be to replace the GBK (and triggers and 
timestamp combiners) with custom stateful ParDo, that could control the 
output timestamp (it can set timer output timestamp to current element's 
timestamp, when this would be the current minimum).


Jan

On 1/13/21 3:42 PM, Raman Gupta wrote:
(Replying to Reza) Yes, I am using TestStream for my unit test. Other 
replies below.


On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Hi,

yes, there is a possible non-determinism, that is related to the
timestamp combiner. Timestamp combiners combine only elements,
that are not 'late' ([1]), meaning that their timestamp is not
preceding output watermark of the GBK. Looking at the pipeline
code I suppose that could be the cause.


Yes, the test stream in this test case does indeed send the element in 
question "late". Here is the setup:


val base = Instant.EPOCH + 6.hours
val xStream: TestStream = TestStream.create(coder)
  .addElements(x["1"]) // this just initializes the looping timer
  // advance watermark past end of window that would normally process x2
  .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda())
  .addElements(x["2"]) // now we see the element
  .advanceWatermarkToInfinity()

Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z 
and the watermark at the time x["2"] is added is 
1970-01-01T09:00:01.000Z.


So I get your point that the timestamp combiner is not used for late 
elements, but if late elements are singly emitted as in this pipeline, 
why do any timestamp modification at all? I would expect them to 
arrive with their original timestamp, not one changed 
from 1970-01-01T07:30:00.000Z to 1970-01-01T07:34:59.999Z (this is the 
part that seems non-deterministic). What is the logic / reason behind 
the pipeline setting this element's timestamp to 1970-01-01T07:34:59.999Z?


You can make the pipeline deterministic by using
TimestampCombiner.END_OF_WINDOW (default).


It's definitely not ideal for this use case, but I'll consider it.

If you *need* to use the TimestampCombiner.EARLIEST, you can
probably do that by tweaking the looping timer stateful dofn and
fix timestamps there (using timer output timestamp).


I had already tried that but the pipeline throws an error that the 
timestamp emitted cannot be earlier than the current element timestamp.


Thanks,
Raman

  Jan

[1] https://issues.apache.org/jira/browse/BEAM-2262

On 1/12/21 5:26 PM, Raman Gupta wrote:

Your reply made me realize I removed the condition from my local
copy of the looping timer that brings the timer forward if it
encounters an earlier element later in the stream:

|if (currentTimerValue == null ||
currentTimerValue**>**nextTimerTimeBasedOnCurrentElement.getMillis())
{|

Restoring that condition fixes that issue.

However, the reason I removed that condition in the first place
was because it was making a unit test non-deterministic --
sometimes the element timestamps into the looping timer didn't
seem to match the element timestamps according to the EARLIEST
timestamp combiner defined, causing the timer to execute an
additional time.

The pipeline:

input
   // withAllowedTimestampSkew is deprecated, but as of now, there
is no replacement // https://issues.apache.org/jira/browse/BEAM-644 
.apply("XTimestamps", WithTimestamps
 .of{ it.enteredAt.asJoda()} 
.withAllowedTimestampSkew(Duration.INFINITE.asJoda())
   )
   .apply("FixedTickWindows", 
Window.into(FixedWindows.of(5.minutes.asJoda()))
   .triggering(
 AfterWatermark.pastEndOfWindow()
   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
   .withLateFirings(AfterPane.elementCountAtLeast(1))
   )
   .withAllowedLateness(3.days.asJoda(), 
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
   .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
   .discardingFiredPanes()
 

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Jan Lukavský

Hi,

yes, there is a possible non-determinism, that is related to the 
timestamp combiner. Timestamp combiners combine only elements, that are 
not 'late' ([1]), meaning that their timestamp is not preceding output 
watermark of the GBK. Looking at the pipeline code I suppose that could 
be the cause. You can make the pipeline deterministic by using 
TimestampCombiner.END_OF_WINDOW (default). If you *need* to use the 
TimestampCombiner.EARLIEST, you can probably do that by tweaking the 
looping timer stateful dofn and fix timestamps there (using timer output 
timestamp).


  Jan

[1] https://issues.apache.org/jira/browse/BEAM-2262

On 1/12/21 5:26 PM, Raman Gupta wrote:
Your reply made me realize I removed the condition from my local copy 
of the looping timer that brings the timer forward if it encounters an 
earlier element later in the stream:


|if (currentTimerValue == null || 
currentTimerValue**>**nextTimerTimeBasedOnCurrentElement.getMillis()) {|


Restoring that condition fixes that issue.

However, the reason I removed that condition in the first place was 
because it was making a unit test non-deterministic -- sometimes the 
element timestamps into the looping timer didn't seem to match the 
element timestamps according to the EARLIEST timestamp combiner 
defined, causing the timer to execute an additional time.


The pipeline:

input
   // withAllowedTimestampSkew is deprecated, but as of now, there is no 
replacement // https://issues.apache.org/jira/browse/BEAM-644 .apply("XTimestamps", WithTimestamps

 .of{ it.enteredAt.asJoda()} 
.withAllowedTimestampSkew(Duration.INFINITE.asJoda())
   )
   .apply("FixedTickWindows", 
Window.into(FixedWindows.of(5.minutes.asJoda()))
   .triggering(
 AfterWatermark.pastEndOfWindow()
   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
   .withLateFirings(AfterPane.elementCountAtLeast(1))
   )
   .withAllowedLateness(3.days.asJoda(), 
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
   .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
   .discardingFiredPanes()
   .withTimestampCombiner(TimestampCombiner.EARLIEST)
   )
   .apply("KeyByUser", WithKeys.of{ it.userId })
   .apply("GroupByUser", GroupByKey.create())
   .apply("GlobalWindowsLoopingStatefulTimer", Window.into>>(GlobalWindows())
   
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
   .discardingFiredPanes()
   .withTimestampCombiner(TimestampCombiner.EARLIEST)
   )
   .apply("LoopingStatefulTimer", ParDo.of(LoopingStatefulTimer(5.minutes, 
(options.timerTimeoutDays ?:30).days)))

The looping timer receives an @Timestamp value in the process function of:

1970-01-01T07:34:59.999Z

but the earliest timestamp of the (single) element in the elements 
iterable is:


1970-01-01T07:30:00.000Z

I would have thought given my timestamp combiners on my windows that 
the timestamp should have been 07:30:00.000Z. Is there something wrong 
in my pipeline that is causing this non-deterministic behavior?


Thanks,
Raman

On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote:


Hi Raman,

can you share the details of the pipeline? How exactly are you
using the
looping timer? Timer as described in the linked blog post should be
deterministic even when the order of the input elements is undefined.
Does you logic depend on element ordering?

  Jan

On 1/12/21 3:18 PM, Raman Gupta wrote:
> Hello, I am building and testing a pipeline with the direct runner.
> The pipeline includes a looping timer -
> https://beam.apache.org/blog/looping-timers/.
>
> For now, I am using JdbcIO to obtain my input data, though when put
> into production the pipeline will use PubSubIO.
>
> I am finding that the looping timer begins producing outputs at a
> random event time, which makes some sense given the
randomization of
> inputs in the direct runner. However, this makes the results of
> executing my pipeline with the direct runner completely
non-deterministic.
>
> So:
>
> 1) Is there a way to turn off this non-deterministic behavior, but
> just for the GlobalWindow / LoopingTimer?
>
> 2) Perhaps alternatively, is there a way to "initialize" the
looping
> timer when the pipeline starts, rather than when it first sees an
> element? Perhaps a side input?
>
> 3) Am I right in assuming that when I move this pipeline to
pub/sub io
> and operate it in streaming mode, this issue will go away?
>
> Thanks!
> Raman
>



Re: Looping timer, global windows, and direct runner

2021-01-12 Thread Jan Lukavský

Hi Raman,

can you share the details of the pipeline? How exactly are you using the 
looping timer? Timer as described in the linked blog post should be 
deterministic even when the order of the input elements is undefined. 
Does you logic depend on element ordering?


 Jan

On 1/12/21 3:18 PM, Raman Gupta wrote:
Hello, I am building and testing a pipeline with the direct runner. 
The pipeline includes a looping timer - 
https://beam.apache.org/blog/looping-timers/.


For now, I am using JdbcIO to obtain my input data, though when put 
into production the pipeline will use PubSubIO.


I am finding that the looping timer begins producing outputs at a 
random event time, which makes some sense given the randomization of 
inputs in the direct runner. However, this makes the results of 
executing my pipeline with the direct runner completely non-deterministic.


So:

1) Is there a way to turn off this non-deterministic behavior, but 
just for the GlobalWindow / LoopingTimer?


2) Perhaps alternatively, is there a way to "initialize" the looping 
timer when the pipeline starts, rather than when it first sees an 
element? Perhaps a side input?


3) Am I right in assuming that when I move this pipeline to pub/sub io 
and operate it in streaming mode, this issue will go away?


Thanks!
Raman



Re: Question regarding GoupByKey operator on unbounded data

2020-12-14 Thread Jan Lukavský

Hi,

I think what you might be looking for is "stateful processing", please 
have a look at [1]. Note that input to stateful DoFn must be of type 
KV, which then ensures similar behavior to Flink's keyBy.


Best,

 Jan

[1] https://beam.apache.org/blog/stateful-processing/

On 12/13/20 6:27 AM, Tao Li wrote:


Sorry I think I had some misunderstanding on keyBy API from Flink. 
It’s not exactly equivalent to GroupByKey from Beam. So please ignore 
my question and this email thread. Thanks for help though 😊


*From: *Tao Li 
*Date: *Friday, December 11, 2020 at 7:29 PM
*To: *"user@beam.apache.org" , Reuven Lax 

*Cc: *Mehmet Emre Sahin , Ying-Chang Cheng 


*Subject: *Re: Question regarding GoupByKey operator on unbounded data

Would Combine.PerKey work for my case? Seems like it does not require 
a window function.


At the same time it seems that this operator is typically used to 
generate some aggregated output (e.g. count) instead of the value 
list. So I am not sure if it’s suitable for my use case.


Please advise. Thanks!

*From: *Tao Li 
*Reply-To: *"user@beam.apache.org" 
*Date: *Friday, December 11, 2020 at 10:29 AM
*To: *"user@beam.apache.org" , Reuven Lax 

*Cc: *Mehmet Emre Sahin , Ying-Chang Cheng 


*Subject: *Re: Question regarding GoupByKey operator on unbounded data

Hi @Reuven Lax  basically we have a flink app 
that does a stream processing. It uses a KeyBy operation to generate a 
keyed stream. Since we need to query all historical data of the input, 
we are not specifying a window function or a trigger in this flink 
app, which is fine.


Now we would like to convert this flink app to a beam app. The problem 
is that for a unbounded PCollection, beam requires either a non-global 
windowing or an aggregation trigger to perform a GroupByKey operation.


I was thinking about applying a sliding window with a huge size (say 1 
year) to accommodate this Beam requirement. But not sure if this is 
feasible or a good practice.


So what’s your recommendation to solve this problem? Thanks!

*From: *Reuven Lax 
*Reply-To: *"user@beam.apache.org" 
*Date: *Thursday, December 10, 2020 at 3:07 PM
*To: *user 
*Cc: *Mehmet Emre Sahin , Ying-Chang Cheng 


*Subject: *Re: Question regarding GoupByKey operator on unbounded data

Can you explain more about what exactly you are trying to do?

On Thu, Dec 10, 2020 at 2:51 PM Tao Li > wrote:


Hi Beam community,

I got a quick question about GoupByKey operator. According to this
doc

,
 if we are using unbounded PCollection, it’s required to specify
either non-global windowing


 or
an aggregation trigger


 in
order to perform a GroupByKey operation.

In comparison, KeyBy


operator from flink does not have such a hard requirement for
streamed data.

In our use case, we do need to query all historical streamed data
and group by keys. KeyBy from flink satisfies our need, but Beam
GoupByKey does not satisfy this need. I thought about applying a
sliding window with a very large size (say 1 year), thus we can
query the past 1 year’s data. But not sure if this is feasible or
a good practice.


Re: Beam 2.25.0 / Flink 1.11.2 - Job failing after upgrading from 2.24.0 / Flink 1.10.2

2020-11-04 Thread Jan Lukavský

Hi Tobias,

this looks like a bug, the clearGlobalState method has been introduced 
in 2.25.0, and it (seems to) might have issues related to rocksdb, can 
you file a Jira for that, please?


Thanks,

 Jan

On 11/4/20 9:50 AM, Kaymak, Tobias wrote:
When running our Kafka-To-BigQuery pipeline with the Flink 1.11.2 
Docker image,
the following exception is visible for the failing job on the *job 
manager*:


2020-11-04 09:27:14
java.lang.RuntimeException: Failed to cleanup global state.
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
at org.apache.flink.streaming.runtime.io 
.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)
at org.apache.flink.streaming.runtime.io 
.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at org.apache.flink.streaming.runtime.io 
.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be 
cast to org.apache.flink.runtime.state.VoidNamespace
at 
org.apache.flink.runtime.state.VoidNamespaceSerializer.serialize(VoidNamespaceSerializer.java:32)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils.writeNameSpace(RocksDBKeySerializationUtils.java:77)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.getKeys(RocksDBKeyedStateBackend.java:291)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:242)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:141)

... 17 more
This is from the *task manager's* logs:
2020-11-04 08:46:31,250 WARN 
 org.apache.flink.runtime.taskmanager.Task                    [] - 
BigQueryIO.Write/BatchLoads/JobIdCreationRoot_LOAD/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Splittable 
ueryIO.Write/BatchLoads/CreateJobId_LOAD/ParMultiDo(Anonymous) -> 
BigQueryIO.Write/BatchLoads/JobIdSideInput_LOAD/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous) 
-> ToKeyedWorkItem (1/1) (bebac6c581d1b8ece88007ec0 
java.lang.RuntimeException: Failed to cleanup global state.   at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.clearGlobalState(FlinkStateInternals.java:150) 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] 
  at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.maybeEmitWatermark(DoFnOperator.java:791) 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] 
  at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:741) 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] 
  at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:713) 
~[blob_p-656af447c7120652ba6a8f48516776effc33dc07-8df5e6b00c52050981a9af655c97d0c9:?] 
  at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167) 
~[flink-dist_2.11-

  1   2   >