default_sdk_harness_log_level multi-language support

2024-05-24 Thread Ferran Fernández Garrido
Hi!

I am trying to adjust the log level for a Beam YAML pipeline that uses
Kafka. Since Kafka is a multi-language transform,
default_sdk_harness_log_level is not currently supported, which means
I can't limit the logs (Kafka can be a bit too verbose sometimes).
This could lead to increased logging costs if you use a cloud service
such as Dataflow to run the pipeline.

Is there any initiative at the moment to support
default_sdk_harness_log_level in multi-language transforms? Maybe some
workaround? I suppose that in terms of Beam YAML, we could pass these
options to the schema providers somehow. However, I don't think that
solution is ideal, as it would not address the issue for other people
using multi-language transforms outside of Beam YAML.

Thanks!

Ferran


Re: [YAML] ReadFromKafka with yaml

2024-01-10 Thread Ferran Fernández Garrido
Hi Yarden,

If you are using Dataflow as a runner, you can already use
ReadFromKafka (introduced originally in version 2.52). Dataflow will
handle the expansion service automatically, so you don't have to do
anything.

If you want to run it locally for development purposes, you'll have to
build the Docker image. You can check out the project and run:

./gradlew :sdks:java:container:java8:docker
-Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest (DOCKER ROOT
-> repo location)

Then, for instance, if you want to run your custom Docker image in
Dataflow, you could do this:

(Build the Python SDK -> python setup.py sdist to get
apache-beam-2.53.0.dev0.tar.gz)

You'll have to build the expansion service that Kafka uses (in case
you've changed something in the KafkaIO) : ./gradlew
:sdks:java:io:expansion-service:build

python3 -m apache_beam.yaml.main --runner=DataflowRunner
--project=project_id --region=region --temp_location=temp_location
--pipeline_spec_file=yaml_pipeline.yml
--staging_location=staging_location
--sdk_location="path/apache-beam-2.53.0.dev0.tar.gz"
--sdk_harness_container_image_overrides=".*java.*,$DOCKER_ROOT:latest"
--streaming

This is an example of how to read JSON events from Kafka in Beam YAML:

- type: ReadFromKafka
  config:
  topic: 'TOPIC_NAME'
  format: JSON
  bootstrap_servers: 'BOOTSTRAP_SERVERS'
  schema: 'JSON_SCHEMA'

Best,
Ferran

El mié, 10 ene 2024 a las 14:11, Yarden BenMoshe
() escribió:
>
> Hi,
>
> I am trying to consume a kafka topic using ReadFromKafka transform.
>
> If i got it right, since ReadFromKafka is originally written in java, an 
> expansion service is needed and default env is set to DOCKER, and in current 
> implementation I can see that expansion service field is not adjustable (im 
> not able to pass it as part of the transform's config).
> Is there currently a way to ReadFromKafka from a pipeline written with yaml 
> api? If so, an explanation would be much appreciated.
>
> I saw there's some workaround suggested online of using Docker-in-Docker but 
> would prefer to avoid it.
>
> Thanks
> Yarden


Re: [YAML] add timestamp to a bounded PCollection

2024-01-08 Thread Ferran Fernández Garrido
Hi Yarden,

Since it's a bounded source you could try with Sql transformation
grouping by the timestamp column. Here are some examples of grouping:

https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml

However, if you want to add a timestamp column in addition to the
original CSV records then, there are multiple ways to achieve that.

1) MapToFields:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md
[Your timestamp column could be a callable to get the current
timestamp on each record]

2) If you need an extra layer of transformation complexity I would
recommend creating a custom transformation:

# - type: MyCustomTransform
# name: AddDateTimeColumn
# config:
# prefix: 'whatever'

providers:
- type: 'javaJar'
config:
jar: 'gs://path/of/the/java.jar'
transforms:
MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1'

Here a good example of how to do that in Java:
https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaPrefixRegistrar.java

Best,
Ferran

El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () escribió:
>
> Hi all,
> Im quite new to using beam yaml. I am working with a CSV file and want to 
> implement some windowing logic to it.
> Was wondering what is the right way to add timestamps to each element, 
> assuming I have a column including a timestamp.
>
> I am aware of Beam Programming Guide (apache.org) part but not sure how this 
> can be implemented and used from yaml prespective.
>
> Thanks
> Yarden


Re: KafkaIO Parameter Issue | Runtime PipelineOptions | Apche Beam

2023-08-05 Thread Ferran Fernández Garrido
Hi Himanshu,

I would suggest taking a look at this:

https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates

When you create a regular template, it’s pre-configured pipeline. That’s
useful if you always want to apply the same data processes logic. However,
that doesn’t seem to be the case for you. Flex templates, on the other
hand, are an extension that offers flexibility to parameterize the pipeline
code. For the topic, for instance, you can simply use Strings, and there’s
no need for ValueProviders

Best,

Ferran



El El sáb, 5 ago 2023 a las 7:55, himanshu singhal <
singhal.himansh...@gmail.com> escribió:

> Hello Beam Team,
>
> I am using apache beam for reading from Kafka using KafkaIO in Dataflow
> Runner. Here I am facing an issue to make KafkaIO Parameters (like ->
> Config, topics) dynamic. I mean, when I am making these parameters as a
> PipelineOptions using RuntimeValueProvider My Dataflow Template is not
> getting created and getting errors that KafkaIO does not support
> RuntimeValueProvider. So in this case can you please suggest or give any
> sample code to make these parameters as PipelineOptions.
>
>
>
> Thanks & Regards
> *Himanshu Singhal*
> M:- +917821076244
> E:- singhal.himansh...@gmail.com
>