[Interactive Beam] Changes to local pipeline executions

2019-12-04 Thread Ning Kang
*If you are not an Interactive Beam user, you can ignore this email.*

Hi Interactive Beam users,

We've recently made some changes to how Interactive Beam gets to understand
the context of the pipelines/PCollections defined in your notebook/code.

If you write Beam pipelines with the InteractiveRunner directly in notebook
cells like the Interactive Beam Examples

or
define everything in "__main__", you will not be affected by the changes.

If you define your pipelines in local scope such as functions (an example
scenario, unit tests) and you rely on interactive features to introspect
the data of a PCollection after a pipeline run, you might see such  "raise
ValueError('PCollection not available, please run the pipeline.')".

It's because Interactive Beam now "watches" the "__main__" scope by default
to provide features implicitly. To avoid the error, you only need to tell
Interactive Beam to "watch
"
your local scopes too.
An example to fix the issue,
from apache_beam.runners.interactive import interactive_beam
...
def some_func(...):
p = beam.Pipeline(InteractiveRunner())
pcoll = p | 'SomeTransform' >> SomeTransform()
...
interactive_beam.watch(locals())
result = p.run()
...
...

Thanks for using Interactive Beam!

Ning.


Re: OOM issue on Dataflow Worker by doing string manipulation

2020-09-02 Thread Ning Kang
Here is a question answered on StackOverflow:
https://stackoverflow.com/questions/27221292/when-should-i-use-javas-stringwriter

Could you try using StringBuilder instead since the usage is not
appropriate for a StringWriter?


On Wed, Sep 2, 2020 at 2:49 PM Talat Uyarer 
wrote:

> Hi,
>
> I have an issue with String Concatenating. You can see my code below.[1] I
> have a step on my df job which is concatenating strings. But somehow when I
> use that step my job starts getting jvm restart errors.
>
>  Shutting down JVM after 8 consecutive periods of measured GC thrashing.
>> Memory is used/total/max = 4112/5994/5994 MB, GC last/max = 97.36/97.36 %,
>> #pushbacks=3, gc thrashing=true. Heap dump not written.
>
>
> And also I try to use Avro rather than String. When I use Avro, it works
> fine without any issue. Do you have any suggestions?
>
> Thanks
>
> [1] https://dpaste.com/7RTV86WQC
>
>
>


Re: OOM issue on Dataflow Worker by doing string manipulation

2020-09-03 Thread Ning Kang
 buffers very quickly. 
>>>>>> I'm
>>>>>> not that familiar with StringBuilder, is there a way to reset it and 
>>>>>> re-use
>>>>>> the existing capacity? Maybe `writer.delete(0, writer.length())` [1]?
>>>>>>
>>>>>> [1]
>>>>>> https://stackoverflow.com/questions/242438/is-it-better-to-reuse-a-stringbuilder-in-a-loop
>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_242438_is-2Dit-2Dbetter-2Dto-2Dreuse-2Da-2Dstringbuilder-2Din-2Da-2Dloop&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=8xskmxTZ2EbxwBknWfeIiV2kEsXsu9dzjWT_yG6A0s4&s=ZL6S353ZUzPRmxrPo8Sei_mdxsWDxs4Km2RwwiwefEU&e=>
>>>>>>
>>>>>> On Wed, Sep 2, 2020 at 3:02 PM Talat Uyarer <
>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Sorry for the wrong import. You can see on the code I am using
>>>>>>> StringBuilder.
>>>>>>>
>>>>>>> On Wed, Sep 2, 2020 at 2:55 PM Ning Kang  wrote:
>>>>>>>
>>>>>>>> Here is a question answered on StackOverflow:
>>>>>>>> https://stackoverflow.com/questions/27221292/when-should-i-use-javas-stringwriter
>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_27221292_when-2Dshould-2Di-2Duse-2Djavas-2Dstringwriter&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY&s=ggveahdPKo3vaAhADvjz4ucjndSmzyOZ8FPBvJ_0oZQ&e=>
>>>>>>>>
>>>>>>>> Could you try using StringBuilder instead since the usage is not
>>>>>>>> appropriate for a StringWriter?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Sep 2, 2020 at 2:49 PM Talat Uyarer <
>>>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I have an issue with String Concatenating. You can see my code
>>>>>>>>> below.[1] I have a step on my df job which is concatenating strings. 
>>>>>>>>> But
>>>>>>>>> somehow when I use that step my job starts getting jvm restart errors.
>>>>>>>>>
>>>>>>>>>  Shutting down JVM after 8 consecutive periods of measured GC
>>>>>>>>>> thrashing. Memory is used/total/max = 4112/5994/5994 MB, GC last/max 
>>>>>>>>>> =
>>>>>>>>>> 97.36/97.36 %, #pushbacks=3, gc thrashing=true. Heap dump not 
>>>>>>>>>> written.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> And also I try to use Avro rather than String. When I use Avro, it
>>>>>>>>> works fine without any issue. Do you have any suggestions?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> [1] https://dpaste.com/7RTV86WQC
>>>>>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__dpaste.com_7RTV86WQC&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=mVBqxC5kNOARPduF-c17S1VnIw8gwS6alvgONJKfheY&s=eSd0NcP8fw5BOZlSXtUMRfYuGWlN-gcXENVwgCmrapY&e=>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>


Re: periodic impulse bug

2020-12-21 Thread Ning Kang
I also ran into this issue some time ago. Couldn't figure out why, but
explicitly setting the end and start to some integer value when building
the `PeriodicImpulse` transform could be a workaround.

On Mon, Dec 21, 2020 at 4:19 AM Manninger, Matyas <
matyas.mannin...@veolia.com> wrote:

> Dear Beam users,
>
> In the python SDK I tried using the PeriodicImpulse but seems like there
> is an internal bug. In the periodicsequence.py on line 42 there is a
> division where a type Duration is being divided, but no division operation
> is defined. What am I missing? Is there a workaround to this? Here is the
> error message that lead to this "discovery":
> File
> "/home/user/.virtualenvs/dflowenv/lib/python3.8/site-packages/apache_beam/transforms/periodicsequence.py",
> line 42, in initial_restriction
>total_outputs = math.ceil((end - start) / interval)
> TypeError: unsupported operand type(s) for /: 'Duration' and 'Duration'
> [while running 'read/periodic_impulse/GenSequence/PairWithRestriction']
>
>


Re: Any easy way to extract values from PCollection?

2021-04-22 Thread Ning Kang
+1 to Brian's answer.

In Java, you can

singleValuedPcollection .apply("Write single value", TextIO.write().
to(options.getSomeGcsPath())
as the last step of your pipeline.

Then in your program, after executing the pipeline (wait until finish), use
the Cloud Storage Java client library

to read the file and extract the value.

On Thu, Apr 22, 2021 at 10:45 AM Brian Hulette  wrote:

> I don't think there's an easy answer to this question, in general all you
> can do with a PCollection is indicate you'd like to write it out to an IO.
> There has been some work in the Python SDK on "Interactive Beam" which is
> designed for using the Python SDK interactively in a notebook environment.
> It will let you collect() a PCollection - meaning it runs the pipeline and
> materializes the result. There's no such capability for the other SDKs
> though.
>
> On Wed, Apr 21, 2021 at 8:24 PM Tao Li  wrote:
>
>> Hi Beam community,
>>
>>
>>
>> This is the question I am asking:
>> https://stackoverflow.com/questions/28015924/how-to-extract-contents-from-pcollection-in-cloud-dataflow
>>
>>
>>
>> Thanks!
>>
>


Re: Question on printing out a PCollection

2021-04-30 Thread Ning Kang
Hi Tao,

The `show()` API works with any IPython notebook runtimes, including Colab,
Jupyter Lab and pre-lab Jupyter Notebooks, as long as you have `%pip
install apache-beam[interactive]`.

Additionally, the `show_graph()` API needs GraphViz binary installed,
details can be found in the README
<https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive#pre-requisites>
.

If you've created an Apache Beam notebook instance on Google Cloud, there
is an example notebook "Examples/Visualize_Data.ipynb" demonstrating how to
visualize data of PCollections with different libraries:

   - Native Interactive Beam Visualization
   - Pandas DataFrame
   - Matplotlib
   - Seaborn
   - Bokeh
   - D3.js

Hope this helps!

Ning

On Fri, Apr 30, 2021 at 9:24 AM Brian Hulette  wrote:

> +Ning Kang  +Sam Rohde 
>
> On Thu, Apr 29, 2021 at 6:13 PM Tao Li  wrote:
>
>> Hi Beam community,
>>
>>
>>
>> The notebook console from Google Cloud defines a show() API to display a
>> PCollection which is very neat:
>> https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development
>>
>>
>>
>> If we are using a regular jupyter notebook to run beam app, how can we
>> print out a PCollection easily? What’s the best practice? Thanks!
>>
>>
>>
>


Re: beam/flink cluster with podman

2022-12-05 Thread Ning Kang via user
Hi Matthis,

Beam Notebooks (InteractiveRunner) supports notebook-managed Flink cluster,
details see:
https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development#interactive_flinkrunner_on_notebook-managed_clusters
.
You should also be able to see the "open source support" section of this
blog:
https://cloud.google.com/blog/products/data-analytics/interactive-beam-pipeline-ml-inference-at-scale-in-notebook
.

If you have your own sdk container, use this configuration:

options.view_as(PortableOptions).environment_config =
'YOUR_REGISTRY/beam_python3.x_sdk:2.4x.0'

And leave the environment_type to its default "DOCKER".

To control the job parallelism, use this parameter (not the one in the test
file):

# The parallelism is applied to each step, so if your pipeline has 10
steps, you
# end up having 150 * 10 tasks scheduled that can theoretically be executed
in parallel by
# the 320 (upper bound) slots/workers/threads.
options.view_as(FlinkRunnerOptions).parallelism = 150


To tune the throughput/parallelism for your flink cluster, here are a few
example knobs:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py#L63-L69

Ning.

On Sun, Dec 4, 2022 at 9:32 AM Matthis Leicht 
wrote:

> Hello everybody,
>
> does anyone have experience with setting up a flink-cluster to execute
> beam pipelines over the python sdk?
>
> Because rootfull containers are not allowed where I want to run the
> cluster, I try to do this in podman/podman-compose.
>
> I try to make a test pipeline work:
> https://github.com/matleicht/podman_beam_flink
> Environment:
> Debian 11 with podman and python 3.2.9(with apache-beam==2.38.0 and
> podman-compose) installed.
>
> The setup of the cluster defined in:
> https://github.com/matleicht/podman_beam_flink/blob/master/docker-compose.yml
> 1x flink-jobmanager (flink version 1.14)
> 1x flink-taskmanager
> 1x python harness sdk
> I chose to create a sdk container manually because i don't have docker
> installed and flink obviously fails, when it tries to create a container
> over docker.
>
> When i try to run the pipeline (
> https://github.com/matleicht/podman_beam_flink/blob/master/pipeline_test.py)
> the sdk worker get stuck with the container lock:
>
> 2022/12/04 16:13:02 Starting worker pool 1: python -m
> apache_beam.runners.worker.worker_pool_main --service_port=5
> --container_executable=/opt/apache/beam/boot
> Starting worker with command ['/opt/apache/beam/boot', '--id=1-1',
> '--logging_endpoint=localhost:45087',
> '--artifact_endpoint=localhost:35323',
> '--provision_endpoint=localhost:36435',
> '--control_endpoint=localhost:33237']
> 2022/12/04 16:16:31 Failed to obtain provisioning information: failed to
> dial server at localhost:36435
> caused by:
> context deadline exceeded
>
> I suspect that I have an error in the network setup or there are some
> configurations missing for the harness worker, but I could not figure out
> the problem.
>
> Does anyone tried this himself on podman or see the error in container
> configuration and could help me?
>
> My ultimate goal is to create a streaming pipeline to read data from Kafka
> Instance, process it, and write back to it.
>
> Thanks for your help in advance.
> Best regards
> Matthis
>
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-19 Thread Ning Kang via user
Hi,

There is a flink pipeline option `parallelism` that you can set:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L1504-L1510
.

This parallelism is applied to each step (there is no API to configure a
different value for each step). So if you have 10 steps and set the
parallelism to 10, there will be 100 tasks created. You may use the
`max_parallelism` to limit the pipeline wide parallelism.

The reason you want to limit the max_parallelism is that a Flink cluster
might run into network issues when there are too many tasks running in
parallel. You can configure the flink cluster through configurations (an
example
)
to allocate more resources to the task manager if applicable (say you have
the access to control the cluster's creation) to increase the capacity of
concurrent tasks. This is specific to Flink, you can find more
guidance from Flink's document:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/
.

Ning.




On Wed, Apr 19, 2023 at 8:23 AM Nimalan Mahendran <
nima...@liminalinsights.com> wrote:

> Same need here, using Flink runner. We are processing a pcollection
> (extracting features per element) then combining these into groups of
> features and running the next operator on those groups.
>
> Each group contains ~50 elements, so the parallelism of the operator
> upstream of the groupby should be higher, to be balanced with the
> downstream operator.
>
> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote:
>
>> Hi Reuven,
>>
>> It would be better to set parallelism for operators, as I mentioned
>> before, there may be multiple groupby, join operators in one pipeline, and
>> their parallelism can be different due to different input data sizes.
>>
>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax  wrote:
>>
>>> Jeff - does setting the global default work for you, or do you need
>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>
>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw 
>>> wrote:
>>>
 Yeah, I don't think we have a good per-operator API for this. If we
 were to add it, it probably belongs in ResourceHints.

 On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax  wrote:

> Looking at FlinkPipelineOptions, there is a parallelism option you can
> set. I believe this sets the default parallelism for all Flink operators.
>
> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang  wrote:
>
>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>> kind of mechanism, so I am looking for a general solution on the beam 
>> side.
>>
>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau 
>> wrote:
>>
>>> To a (small) degree Sparks “new” AQE might be able to help depending
>>> on what kind of operations Beam is compiling it down to.
>>>
>>> Have you tried setting spark.sql.adaptive.enabled &
>>> spark.sql.adaptive.coalescePartitions.enabled
>>>
>>>
>>>
>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
 I see. Robert - what is the story for parallelism controls on
 GBK with the Spark or Flink runners?

 On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang 
 wrote:

> No, I don't use dataflow, I use Spark & Flink.
>
>
> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax 
> wrote:
>
>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>> Spark and Flink - dynamically modifies the parallelism as the 
>> operator
>> runs, so there is no need to have such controls. In fact these 
>> specific
>> controls wouldn't make much sense for the way Dataflow implements 
>> these
>> operators.
>>
>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang 
>> wrote:
>>
>>> Just for performance tuning like in Spark and Flink.
>>>
>>>
>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
 What are you trying to achieve by setting the parallelism?

 On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang 
 wrote:

> Thanks Reuven, what I mean is to set the parallelism in
> operator level. And the input size of the operator is unknown at 
> compiling
> stage if it is not a source
>  operator,
>
> Here's an example of flink
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
> Spark also support to set operator level parallelism (see 
> groupByKey
>>>

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-20 Thread Ning Kang via user
Hi Jan,

The approach works when your pipeline doesn't have too many operators. And
the operator that needs the highest parallelism can only use at most
#total_task_slots / #operators resources available in the cluster.

Another downside is wasted resources for other smaller operators who cannot
make full use of task slots assigned to them. You might see only 1/10 tasks
running while the other 9/10 tasks idle for an operator with parallelism
10, especially when it's doing some aggregation like a SUM.

One redeeming method is that, for operators following another operator with
high fanout, we can explicitly add a Reshuffle to allow a higher
parallelism. But this circles back to the first downside: if your pipeline
has exponentially high fanout through it, setting a single parallelism for
the whole pipeline is not ideal because it limits the scalability of your
pipeline significantly.

Ning.


On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský  wrote:

> Hi,
>
> this topic was discussed many years ago and the conclusion there was that
> setting the parallelism of individual operators via FlinkPipelineOptions
> (or ResourceHints) is be possible, but would be somewhat cumbersome.
> Although I understand that it "feels" weird to have high parallelism for
> operators with small inputs, does this actually bring any relevant
> performance impact? I always use parallelism based on the largest operator
> in the Pipeline and this seems to work just fine. Is there any particular
> need or measurable impact of such approach?
>
>  Jan
> On 4/19/23 17:23, Nimalan Mahendran wrote:
>
> Same need here, using Flink runner. We are processing a pcollection
> (extracting features per element) then combining these into groups of
> features and running the next operator on those groups.
>
> Each group contains ~50 elements, so the parallelism of the operator
> upstream of the groupby should be higher, to be balanced with the
> downstream operator.
>
> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote:
>
>> Hi Reuven,
>>
>> It would be better to set parallelism for operators, as I mentioned
>> before, there may be multiple groupby, join operators in one pipeline, and
>> their parallelism can be different due to different input data sizes.
>>
>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax  wrote:
>>
>>> Jeff - does setting the global default work for you, or do you need
>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>
>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw 
>>> wrote:
>>>
 Yeah, I don't think we have a good per-operator API for this. If we
 were to add it, it probably belongs in ResourceHints.

 On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax  wrote:

> Looking at FlinkPipelineOptions, there is a parallelism option you can
> set. I believe this sets the default parallelism for all Flink operators.
>
> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang  wrote:
>
>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>> kind of mechanism, so I am looking for a general solution on the beam 
>> side.
>>
>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau 
>> wrote:
>>
>>> To a (small) degree Sparks “new” AQE might be able to help depending
>>> on what kind of operations Beam is compiling it down to.
>>>
>>> Have you tried setting spark.sql.adaptive.enabled &
>>> spark.sql.adaptive.coalescePartitions.enabled
>>>
>>>
>>>
>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
 I see. Robert - what is the story for parallelism controls on
 GBK with the Spark or Flink runners?

 On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang 
 wrote:

> No, I don't use dataflow, I use Spark & Flink.
>
>
> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax 
> wrote:
>
>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>> Spark and Flink - dynamically modifies the parallelism as the 
>> operator
>> runs, so there is no need to have such controls. In fact these 
>> specific
>> controls wouldn't make much sense for the way Dataflow implements 
>> these
>> operators.
>>
>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang 
>> wrote:
>>
>>> Just for performance tuning like in Spark and Flink.
>>>
>>>
>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
 What are you trying to achieve by setting the parallelism?

 On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang 
 wrote:

> Thanks Reuven, what I mean is to set the parallelism in
> operator level. And the input size of the operator is unknown at 
> compil

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-21 Thread Ning Kang via user
Hi Jan,

To generalize the per-stage parallelism configuration, we should have a FR
proposing the capability to explicitly set autoscaling (in this case, fixed
size per stage) policy in Beam pipelines.

Per-step or per-stage parallelism, or fusion/optimization is not part of
the Beam model. They are [Flink] runner implementation details and should
be configured for each runner.

Also, when building the pipeline, it's not clear what the fusion looks like
until the pipeline is submitted to a runner, thus making configuration of
the parallelism/worker-per-stage not straightforward.
Flink's parallelism settings can be found here
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
it's still kind of a black box since you don't really know how many tasks
are actually spawned until you run a pipeline.

That being said, if we have a general interface controlling how a pipeline
scales, each runner could adapt [auto]scaling in their own way.
For example, in a Flink job, each operator/stage's task slot is prorated by
their key numbers; the maximum parallelism is throttled by task slot
utilization.
Another example, in a Dataflow job, each stage horizontally scales by CPU
utilization; vertically scales by memory/disk utilization.

+d...@beam.apache.org 
Let's use this thread to discuss how to configure a pipeline for runners so
that they can scale workers appropriately without exposing runner-specific
details to the Beam model.

Ning.


On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský  wrote:

> Hi Ning,
>
> I might have missed that in the discussion, but we talk about batch
> execution, am I right? In streaming, all operators (PTransforms) of a
> Pipeline are run in the same slots, thus the downsides are limited. You can
> enforce streaming mode using --streaming command-line argument. But yes,
> this might have other implications. For batch only it obviously makes sense
> to limit parallelism of a (fused) 'stage', which is not an transform-level
> concept, but rather a more complex union of transforms divided by shuffle
> barrier. Would you be willing to start a follow-up thread in @dev mailing
> list for this for deeper discussion?
>
>  Jan
> On 4/20/23 19:18, Ning Kang via user wrote:
>
> Hi Jan,
>
> The approach works when your pipeline doesn't have too many operators. And
> the operator that needs the highest parallelism can only use at most
> #total_task_slots / #operators resources available in the cluster.
>
> Another downside is wasted resources for other smaller operators who
> cannot make full use of task slots assigned to them. You might see only
> 1/10 tasks running while the other 9/10 tasks idle for an operator with
> parallelism 10, especially when it's doing some aggregation like a SUM.
>
> One redeeming method is that, for operators following another operator
> with high fanout, we can explicitly add a Reshuffle to allow a higher
> parallelism. But this circles back to the first downside: if your pipeline
> has exponentially high fanout through it, setting a single parallelism for
> the whole pipeline is not ideal because it limits the scalability of your
> pipeline significantly.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> this topic was discussed many years ago and the conclusion there was that
>> setting the parallelism of individual operators via FlinkPipelineOptions
>> (or ResourceHints) is be possible, but would be somewhat cumbersome.
>> Although I understand that it "feels" weird to have high parallelism for
>> operators with small inputs, does this actually bring any relevant
>> performance impact? I always use parallelism based on the largest operator
>> in the Pipeline and this seems to work just fine. Is there any particular
>> need or measurable impact of such approach?
>>
>>  Jan
>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>
>> Same need here, using Flink runner. We are processing a pcollection
>> (extracting features per element) then combining these into groups of
>> features and running the next operator on those groups.
>>
>> Each group contains ~50 elements, so the parallelism of the operator
>> upstream of the groupby should be higher, to be balanced with the
>> downstream operator.
>>
>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang  wrote:
>>
>>> Hi Reuven,
>>>
>>> It would be better to set parallelism for operators, as I mentioned
>>> before, there may be multiple groupby, join operators in one pipeline, and
>>> their parallelism can be different due to different input data sizes.
>>>
>>> On Wed, Apr 19, 2023 at 3:59 A

Re: Interactive runner that uses flink runner can read kafka messages?

2024-03-07 Thread Ning Kang via user
Python SDK's ReadFromKafka is an external transform implemented in Java,
which is similar to SqlTransform. InteractiveRunner doesn't support it.

That being said, if you want to implement an interactive interaction with
external transforms, you may follow the workaround for SQL (
https://cloud.google.com/dataflow/docs/guides/notebook-advanced#beam-sql).
The source code is
https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive/sql
.


Ning.

On Wed, Mar 6, 2024 at 9:50 PM Jaehyeon Kim  wrote:

> Hello,
>
> I'm playing with the interactive runner on a notebook and the flink runner
> is used as the underlying runner. I wonder if it can read messages from
> Kafka. I checked the example notebook
> 
>  and
> it works. However I cannot read Kafka messages with the following error.
>
>  KeyError: 'beam:transform:org.apache.beam:kafka_read_with_metadata:v2'
>
> Cheers,
> Jaehyeon
>
> *Here is the source.*
>
> pipeline_opts = {
> "job_name": "kafka-io",
> "environment_type": "LOOPBACK",
> "streaming": True,
> "parallelism": 3,
> "experiments": [
> "use_deprecated_read"
> ],  ## https://github.com/apache/beam/issues/20979
> "checkpointing_interval": "6",
> }
> options = PipelineOptions([], **pipeline_opts)
> # Required, else it will complain that when importing worker functions
> options.view_as(SetupOptions).save_main_session = True
>
> p = beam.Pipeline(
> 
> interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()),
> options=options
> )
> events = (
> p
> | "Read from Kafka"
> >> kafka.ReadFromKafka(
> consumer_config={
> "bootstrap.servers": os.getenv(
> "BOOTSTRAP_SERVERS",
> "host.docker.internal:29092",
> ),
> "auto.offset.reset": "earliest",
> # "enable.auto.commit": "true",
> "group.id": "kafka-io",
> },
> topics=["website-visit"],
> )
> | "Decode messages" >> beam.Map(decode_message)
> | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
> )
> results = p.run()
> result.wait_until_finish()
>
> *And here is the full error message.*
>
> WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: 
> {'checkpointing_interval': '6'}
>
> ---KeyError
>   Traceback (most recent call last)
> Cell In[17], line 36 15 p = beam.Pipeline( 16 
> interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()),
>  options=options 17 ) 18 events = ( 19 p 20 | "Read 
> from Kafka"   (...) 34 | "Parse elements" >> 
> beam.Map(parse_json).with_output_types(EventLog) 35 )---> 36 results = 
> p.run() 37 result.wait_until_finish() 38 # 
> ib.options.recording_duration = "120s" 39 # ib.show(events)
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/pipeline.py:586
>  
> ,
>  in Pipeline.run(self, test_runner_api)584 finally:585   
> shutil.rmtree(tmpdir)--> 586   return self.runner.run_pipeline(self, 
> self._options)587 finally:588   if not is_in_ipython():
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/interactive_runner.py:148
>  
> ,
>  in InteractiveRunner.run_pipeline(self, pipeline, options)145 if 
> isinstance(self._underlying_runner, FlinkRunner):146   
> self.configure_for_flink(user_pipeline, options)--> 148 pipeline_instrument = 
> inst.build_pipeline_instrument(pipeline, options)150 # The user_pipeline 
> analyzed might be None if the pipeline given has nothing151 # to be 
> cached and tracing back to the user defined pipeline is impossible.152 # 
> When it's None, there is no need to cache including the background153 # 
> caching job and no result to track since no background caching job is154 
> # started at all.155 if user_pipeline:156   # Should use the 
> underlying runner and run asynchronously.
>
> File 
> ~/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/interactive/pipeline_instrument.py:756
>  
> ,
>  in build_pipeline_instrument(pipeline, options)742 def 
> build_pipeline_instrument(pipeline, options=N