default_sdk_harness_log_level multi-language support
Hi! I am trying to adjust the log level for a Beam YAML pipeline that uses Kafka. Since Kafka is a multi-language transform, default_sdk_harness_log_level is not currently supported, which means I can't limit the logs (Kafka can be a bit too verbose sometimes). This could lead to increased logging costs if you use a cloud service such as Dataflow to run the pipeline. Is there any initiative at the moment to support default_sdk_harness_log_level in multi-language transforms? Maybe some workaround? I suppose that in terms of Beam YAML, we could pass these options to the schema providers somehow. However, I don't think that solution is ideal, as it would not address the issue for other people using multi-language transforms outside of Beam YAML. Thanks! Ferran
Re: [YAML] ReadFromKafka with yaml
Hi Yarden, If you are using Dataflow as a runner, you can already use ReadFromKafka (introduced originally in version 2.52). Dataflow will handle the expansion service automatically, so you don't have to do anything. If you want to run it locally for development purposes, you'll have to build the Docker image. You can check out the project and run: ./gradlew :sdks:java:container:java8:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest (DOCKER ROOT -> repo location) Then, for instance, if you want to run your custom Docker image in Dataflow, you could do this: (Build the Python SDK -> python setup.py sdist to get apache-beam-2.53.0.dev0.tar.gz) You'll have to build the expansion service that Kafka uses (in case you've changed something in the KafkaIO) : ./gradlew :sdks:java:io:expansion-service:build python3 -m apache_beam.yaml.main --runner=DataflowRunner --project=project_id --region=region --temp_location=temp_location --pipeline_spec_file=yaml_pipeline.yml --staging_location=staging_location --sdk_location="path/apache-beam-2.53.0.dev0.tar.gz" --sdk_harness_container_image_overrides=".*java.*,$DOCKER_ROOT:latest" --streaming This is an example of how to read JSON events from Kafka in Beam YAML: - type: ReadFromKafka config: topic: 'TOPIC_NAME' format: JSON bootstrap_servers: 'BOOTSTRAP_SERVERS' schema: 'JSON_SCHEMA' Best, Ferran El mié, 10 ene 2024 a las 14:11, Yarden BenMoshe () escribió: > > Hi, > > I am trying to consume a kafka topic using ReadFromKafka transform. > > If i got it right, since ReadFromKafka is originally written in java, an > expansion service is needed and default env is set to DOCKER, and in current > implementation I can see that expansion service field is not adjustable (im > not able to pass it as part of the transform's config). > Is there currently a way to ReadFromKafka from a pipeline written with yaml > api? If so, an explanation would be much appreciated. > > I saw there's some workaround suggested online of using Docker-in-Docker but > would prefer to avoid it. > > Thanks > Yarden
Re: [YAML] add timestamp to a bounded PCollection
Hi Yarden, Since it's a bounded source you could try with Sql transformation grouping by the timestamp column. Here are some examples of grouping: https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml However, if you want to add a timestamp column in addition to the original CSV records then, there are multiple ways to achieve that. 1) MapToFields: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md [Your timestamp column could be a callable to get the current timestamp on each record] 2) If you need an extra layer of transformation complexity I would recommend creating a custom transformation: # - type: MyCustomTransform # name: AddDateTimeColumn # config: # prefix: 'whatever' providers: - type: 'javaJar' config: jar: 'gs://path/of/the/java.jar' transforms: MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1' Here a good example of how to do that in Java: https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaPrefixRegistrar.java Best, Ferran El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () escribió: > > Hi all, > Im quite new to using beam yaml. I am working with a CSV file and want to > implement some windowing logic to it. > Was wondering what is the right way to add timestamps to each element, > assuming I have a column including a timestamp. > > I am aware of Beam Programming Guide (apache.org) part but not sure how this > can be implemented and used from yaml prespective. > > Thanks > Yarden
Re: KafkaIO Parameter Issue | Runtime PipelineOptions | Apche Beam
Hi Himanshu, I would suggest taking a look at this: https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates When you create a regular template, it’s pre-configured pipeline. That’s useful if you always want to apply the same data processes logic. However, that doesn’t seem to be the case for you. Flex templates, on the other hand, are an extension that offers flexibility to parameterize the pipeline code. For the topic, for instance, you can simply use Strings, and there’s no need for ValueProviders Best, Ferran El El sáb, 5 ago 2023 a las 7:55, himanshu singhal < singhal.himansh...@gmail.com> escribió: > Hello Beam Team, > > I am using apache beam for reading from Kafka using KafkaIO in Dataflow > Runner. Here I am facing an issue to make KafkaIO Parameters (like -> > Config, topics) dynamic. I mean, when I am making these parameters as a > PipelineOptions using RuntimeValueProvider My Dataflow Template is not > getting created and getting errors that KafkaIO does not support > RuntimeValueProvider. So in this case can you please suggest or give any > sample code to make these parameters as PipelineOptions. > > > > Thanks & Regards > *Himanshu Singhal* > M:- +917821076244 > E:- singhal.himansh...@gmail.com >