Re: KafkaIO metric publishing

2024-06-19 Thread XQ Hu via user
Is your job a Dataflow Template job?

The error is caused by
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java#L55
.

So basically DataflowTemplateJob does not support metrics.


On Wed, Jun 19, 2024 at 3:57 AM Lahiru Ginnaliya Gamathige <
glah...@gmail.com> wrote:

> Hi Users,
>
> In Google Cloud monitoring there is a limit of 100 metrics and when we are
> using KafkaIO, the library publishes a bunch of metrics per topic. With our
> use we will easily run out of 100 metric limit.
>
> We want to stop KafkaIO from publishing metrics and I do not see this is
> configurable. So I am trying to write a metric filtering logic (we are
> using beam version 2.55.1).
> I wrote a Sink but when I try to find a way to register the sink I cannot
> see a way to do the following in this beam version,
>
> *MetricsEnvironment.setMetricsSink(new
> CustomMetricsSink(options.getProject()));*
>
> Then I tried to register it like this,
>
> PipelineResult results = run(options);
> results.waitUntilFinish();
>
>
>
> *   MetricQueryResults metricQueryResults =
> results.metrics().queryMetrics(MetricsFilter.builder().build());
> CustomMetricSink reporter = new CustomMetricSink(options.getProject());
> reporter.writeMetrics(metricQueryResults);*
>
> With the above code pipeline is failing to start with the 
> error(java.lang.UnsupportedOperationException:
> The result of template creation should not be used.)
>
>
> Do you suggest another solution for this problem (it sounds like a quite
> common problem when using kafkaio). Or do you have any suggestion about my
> attempts ?
>
> Regards
> Lahiru
>
>


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-16 Thread XQ Hu via user
M XQ Hu  wrote:
>>>>
>>>>> Any reason to use this?
>>>>>
>>>>> RUN pip install avro-python3 pyarrow==0.15.1 apache-beam[gcp]==2.30.0
>>>>>  pandas-datareader==0.9.0
>>>>>
>>>>> It is typically recommended to use the latest Beam and build the
>>>>> docker image using the requirements released for each Beam, for example,
>>>>> https://github.com/apache/beam/blob/release-2.56.0/sdks/python/container/py311/base_image_requirements.txt
>>>>>
>>>>> On Wed, Jun 12, 2024 at 1:31 AM Sofia’s World 
>>>>> wrote:
>>>>>
>>>>>> Sure, apologies, it crossed my mind it would have been useful to
>>>>>> refert to it
>>>>>>
>>>>>> so this is the docker file
>>>>>>
>>>>>>
>>>>>> https://github.com/mmistroni/GCP_Experiments/edit/master/dataflow/shareloader/Dockerfile_tester
>>>>>>
>>>>>> I was using a setup.py as well, but then i commented out the usage in
>>>>>> the dockerfile after checking some flex templates which said it is not
>>>>>> needed
>>>>>>
>>>>>>
>>>>>> https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/shareloader/setup_dftester.py
>>>>>>
>>>>>> thanks in advance
>>>>>>  Marco
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 11, 2024 at 10:54 PM XQ Hu  wrote:
>>>>>>
>>>>>>> Can you share your Dockerfile?
>>>>>>>
>>>>>>> On Tue, Jun 11, 2024 at 4:43 PM Sofia’s World 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> thanks all,  it seemed to work but now i am getting a different
>>>>>>>> problem, having issues in building pyarrow...
>>>>>>>>
>>>>>>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
>>>>>>>>:36: DeprecationWarning: pkg_resources is deprecated as an 
>>>>>>>> API. See https://setuptools.pypa.io/en/latest/pkg_resources.html
>>>>>>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
>>>>>>>>WARNING setuptools_scm.pyproject_reading toml section missing 
>>>>>>>> 'pyproject.toml does not contain a tool.setuptools_scm section'
>>>>>>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
>>>>>>>>Traceback (most recent call last):
>>>>>>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
>>>>>>>>  File 
>>>>>>>> "/tmp/pip-build-env-meihcxsp/overlay/lib/python3.11/site-packages/setuptools_scm/_integration/pyproject_reading.py",
>>>>>>>>  line 36, in read_pyproject
>>>>>>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
>>>>>>>>section = defn.get("tool", {})[tool_name]
>>>>>>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
>>>>>>>>  ^^^
>>>>>>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
>>>>>>>>KeyError: 'setuptools_scm'
>>>>>>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
>>>>>>>>running bdist_wheel
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> It is somehow getting messed up with a toml ?
>>>>>>>>
>>>>>>>>
>>>>>>>> Could anyone advise?
>>>>>>>>
>>>>>>>> thanks
>>>>>>>>
>>>>>>>>  Marco
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 11, 2024 at 1:00 AM XQ Hu via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
>>>>>>>>> is a great example.
>>>>>>>>>
>>>>>>>>> On Mon, Jun 10, 2024 at 4:28 PM Valentyn Tymofieiev via user <
>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> In this case the Python version will be defined by the Python
>>>>>>>>>> version installed in the docker image of your flex template. So, 
>>>>>>>>>> you'd
>>>>>>>>>> have to build your flex template from a base image with Python 3.11.
>>>>>>>>>>
>>>>>>>>>> On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World <
>>>>>>>>>> mmistr...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello
>>>>>>>>>>>  no i am running my pipelien on  GCP directly via a flex
>>>>>>>>>>> template, configured using a Docker file
>>>>>>>>>>> Any chances to do something in the Dockerfile to force the
>>>>>>>>>>> version at runtime?
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>
>>>>>>>>>>>> Are you running your pipeline from the python 3.11
>>>>>>>>>>>> environment?  If you are running from a python 3.11 environment 
>>>>>>>>>>>> and don't
>>>>>>>>>>>> use a custom docker container image, DataflowRunner(Assuming 
>>>>>>>>>>>> Apache Beam on
>>>>>>>>>>>> GCP means Apache Beam on DataflowRunner), will use Python 3.11.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Anand
>>>>>>>>>>>>
>>>>>>>>>>>


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-12 Thread XQ Hu via user
Any reason to use this?

RUN pip install avro-python3 pyarrow==0.15.1 apache-beam[gcp]==2.30.0
 pandas-datareader==0.9.0

It is typically recommended to use the latest Beam and build the docker
image using the requirements released for each Beam, for example,
https://github.com/apache/beam/blob/release-2.56.0/sdks/python/container/py311/base_image_requirements.txt

On Wed, Jun 12, 2024 at 1:31 AM Sofia’s World  wrote:

> Sure, apologies, it crossed my mind it would have been useful to refert to
> it
>
> so this is the docker file
>
>
> https://github.com/mmistroni/GCP_Experiments/edit/master/dataflow/shareloader/Dockerfile_tester
>
> I was using a setup.py as well, but then i commented out the usage in the
> dockerfile after checking some flex templates which said it is not needed
>
>
> https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/shareloader/setup_dftester.py
>
> thanks in advance
>  Marco
>
>
>
>
>
>
>
> On Tue, Jun 11, 2024 at 10:54 PM XQ Hu  wrote:
>
>> Can you share your Dockerfile?
>>
>> On Tue, Jun 11, 2024 at 4:43 PM Sofia’s World 
>> wrote:
>>
>>> thanks all,  it seemed to work but now i am getting a different problem,
>>> having issues in building pyarrow...
>>>
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>>> :36: DeprecationWarning: pkg_resources is deprecated as an API. See 
>>> https://setuptools.pypa.io/en/latest/pkg_resources.html
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>>> WARNING setuptools_scm.pyproject_reading toml section missing 
>>> 'pyproject.toml does not contain a tool.setuptools_scm section'
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>>> Traceback (most recent call last):
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image": 
>>> File 
>>> "/tmp/pip-build-env-meihcxsp/overlay/lib/python3.11/site-packages/setuptools_scm/_integration/pyproject_reading.py",
>>>  line 36, in read_pyproject
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image": 
>>>   section = defn.get("tool", {})[tool_name]
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image": 
>>> ~~~~~~~~^^^^^^^
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>>> KeyError: 'setuptools_scm'
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>>> running bdist_wheel
>>>
>>>
>>>
>>>
>>> It is somehow getting messed up with a toml ?
>>>
>>>
>>> Could anyone advise?
>>>
>>> thanks
>>>
>>>  Marco
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jun 11, 2024 at 1:00 AM XQ Hu via user 
>>> wrote:
>>>
>>>>
>>>> https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
>>>> is a great example.
>>>>
>>>> On Mon, Jun 10, 2024 at 4:28 PM Valentyn Tymofieiev via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> In this case the Python version will be defined by the Python version
>>>>> installed in the docker image of your flex template. So, you'd have to
>>>>> build your flex template from a base image with Python 3.11.
>>>>>
>>>>> On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World 
>>>>> wrote:
>>>>>
>>>>>> Hello
>>>>>>  no i am running my pipelien on  GCP directly via a flex template,
>>>>>> configured using a Docker file
>>>>>> Any chances to do something in the Dockerfile to force the version at
>>>>>> runtime?
>>>>>> Thanks
>>>>>>
>>>>>> On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Are you running your pipeline from the python 3.11 environment?  If
>>>>>>> you are running from a python 3.11 environment and don't use a custom
>>>>>>> docker container image, DataflowRunner(Assuming Apache Beam on GCP means
>>>>>>> Apache Beam on DataflowRunner), will use Python 3.11.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Anand
>>>>>>>
>>>>>>


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-11 Thread XQ Hu via user
Can you share your Dockerfile?

On Tue, Jun 11, 2024 at 4:43 PM Sofia’s World  wrote:

> thanks all,  it seemed to work but now i am getting a different problem,
> having issues in building pyarrow...
>
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> :36: DeprecationWarning: pkg_resources is deprecated as an API. See 
> https://setuptools.pypa.io/en/latest/pkg_resources.html
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> WARNING setuptools_scm.pyproject_reading toml section missing 'pyproject.toml 
> does not contain a tool.setuptools_scm section'
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> Traceback (most recent call last):
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image": 
> File 
> "/tmp/pip-build-env-meihcxsp/overlay/lib/python3.11/site-packages/setuptools_scm/_integration/pyproject_reading.py",
>  line 36, in read_pyproject
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> section = defn.get("tool", {})[tool_name]
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>   ^^^
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> KeyError: 'setuptools_scm'
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> running bdist_wheel
>
>
>
>
> It is somehow getting messed up with a toml ?
>
>
> Could anyone advise?
>
> thanks
>
>  Marco
>
>
>
>
>
> On Tue, Jun 11, 2024 at 1:00 AM XQ Hu via user 
> wrote:
>
>>
>> https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
>> is a great example.
>>
>> On Mon, Jun 10, 2024 at 4:28 PM Valentyn Tymofieiev via user <
>> user@beam.apache.org> wrote:
>>
>>> In this case the Python version will be defined by the Python version
>>> installed in the docker image of your flex template. So, you'd have to
>>> build your flex template from a base image with Python 3.11.
>>>
>>> On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World 
>>> wrote:
>>>
>>>> Hello
>>>>  no i am running my pipelien on  GCP directly via a flex template,
>>>> configured using a Docker file
>>>> Any chances to do something in the Dockerfile to force the version at
>>>> runtime?
>>>> Thanks
>>>>
>>>> On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Are you running your pipeline from the python 3.11 environment?  If
>>>>> you are running from a python 3.11 environment and don't use a custom
>>>>> docker container image, DataflowRunner(Assuming Apache Beam on GCP means
>>>>> Apache Beam on DataflowRunner), will use Python 3.11.
>>>>>
>>>>> Thanks,
>>>>> Anand
>>>>>
>>>>


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-10 Thread XQ Hu via user
https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
is a great example.

On Mon, Jun 10, 2024 at 4:28 PM Valentyn Tymofieiev via user <
user@beam.apache.org> wrote:

> In this case the Python version will be defined by the Python version
> installed in the docker image of your flex template. So, you'd have to
> build your flex template from a base image with Python 3.11.
>
> On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World 
> wrote:
>
>> Hello
>>  no i am running my pipelien on  GCP directly via a flex template,
>> configured using a Docker file
>> Any chances to do something in the Dockerfile to force the version at
>> runtime?
>> Thanks
>>
>> On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
>> user@beam.apache.org> wrote:
>>
>>> Hello,
>>>
>>> Are you running your pipeline from the python 3.11 environment?  If you
>>> are running from a python 3.11 environment and don't use a custom docker
>>> container image, DataflowRunner(Assuming Apache Beam on GCP means Apache
>>> Beam on DataflowRunner), will use Python 3.11.
>>>
>>> Thanks,
>>> Anand
>>>
>>


Re: Beam + VertexAI

2024-06-09 Thread XQ Hu via user
If you have a Vertex AI model, try
https://cloud.google.com/dataflow/docs/notebooks/run_inference_vertex_ai
If you want to use the Vertex AI model to do text embedding, try
https://cloud.google.com/dataflow/docs/notebooks/vertex_ai_text_embeddings

On Sun, Jun 9, 2024 at 4:40 AM Sofia’s World  wrote:

> HI all
>  i am looking for samples of integrating VertexAI into apache beam..
> As sample, i want to create a pipeline that retrieves some news
> information and will invoke
> VertexAI to summarize the main point of every news...
>
> Could you anyone give me some pointers?
> Kind regards
>  marco
>


Re: Question: Pipelines Stuck with Java 21 and BigQuery Storage Write API

2024-06-03 Thread XQ Hu via user
Probably related to the strict encapsulation that is enforced with Java 21.
Use `--add-opens=java.base/java.lang=ALL-UNNAMED` as the JVM flag could be
a temporary workaround.

On Mon, Jun 3, 2024 at 3:04 AM 田中万葉  wrote:

> Hi all,
>
> I encountered an UnsupportedOperationException when using Java 21 and the
> BigQuery Storage Write API in a Beam pipeline by using
> ".withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API));"
>
> Having read issue #28120[1] and understanding that Beam version 2.52.0 or
> later supports Java 21 as a runtime, I wonder why such an error happens.
>
> I found there are two workarounds, but the Storage Write API is a more
> preferable way to insert data into BigQuery, so I'd like to find a
> solution.
>
> 1. One workaround is to switch from Java 21 to Java 17(openjdk version
> "17.0.10" 2024-01-16). By changing the  and
>  in the pom.xml file (i.e., without modifying
> App.java itself), the pipeline successfully writes data to my destination
> table on BigQuery. It seems Java 17 and BigQuery Storage Write API works
> fine.
> 2. The other workaround is to change insert method. I tried the BigQuery
> legacy streaming API(
> https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery )
> instead of the Storage Write API. Even though I still used Java 21, when I
> changed my code to
> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS));, I did not
> encounter the error.
>
> So I faced the error only when using Java 21 and BigQuery Storage Write
> API.
>
> I uploaded the code below to reproduce. Could you please inform me how to
> handle this issue?
> https://github.com/cloud-ace/min-reproduce
>
> My Environment
> - OS
>   - Ubuntu 22.04
>   - Mac OS Sonoma(14.3.1)
> - beam 2.53.0, 2.54.0
> - openjdk version "21.0.2" 2024-01-16
> - maven 3.9.6
> - DirectRunner
>
> Thanks,
>
> Kazuha
>
> [1]: https://github.com/apache/beam/issues/28120
>
> Here is the detailed error message.
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.UnsupportedOperationException: Cannot define class using
> reflection: Unable to make protected java.lang.Package
> java.lang.ClassLoader.getPackage(java.lang.String) accessible: module
> java.base does not "opens java.lang" to unnamed module @116d5dff
>
> Caused by: java.lang.UnsupportedOperationException: Cannot define class
> using reflection: Unable to make protected java.lang.Package
> java.lang.ClassLoader.getPackage(java.lang.String) accessible: module
> java.base does not "opens java.lang" to unnamed module @116d5dff
> at
> net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection$Dispatcher$Initializable$Unavailable.defineClass
> (ClassInjector.java:472)
> at
> net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection.injectRaw
> (ClassInjector.java:284)
> at net.bytebuddy.dynamic.loading.ClassInjector$AbstractBase.inject
> (ClassInjector.java:118)
> at
> net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default$InjectionDispatcher.load
> (ClassLoadingStrategy.java:241)
> at net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default.load
> (ClassLoadingStrategy.java:148)
> at net.bytebuddy.dynamic.TypeResolutionStrategy$Passive.initialize
> (TypeResolutionStrategy.java:101)
> at net.bytebuddy.dynamic.DynamicType$Default$Unloaded.load
> (DynamicType.java:6317)
> at
> org.apache.beam.sdk.schemas.utils.AutoValueUtils.createBuilderCreator
> (AutoValueUtils.java:247)
> at org.apache.beam.sdk.schemas.utils.AutoValueUtils.getBuilderCreator
> (AutoValueUtils.java:225)
> at org.apache.beam.sdk.schemas.AutoValueSchema.schemaTypeCreator
> (AutoValueSchema.java:122)
> at org.apache.beam.sdk.schemas.CachingFactory.create
> (CachingFactory.java:56)
> at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply
> (FromRowUsingCreator.java:94)
> at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply
> (FromRowUsingCreator.java:45)
> at org.apache.beam.sdk.schemas.SchemaCoder.decode
> (SchemaCoder.java:126)
> at org.apache.beam.sdk.coders.Coder.decode (Coder.java:159)
> at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:84)
> at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:37)
> at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream
> (CoderUtils.java:142)
> at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray
> (CoderUtils.java:102)
> at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray
> (CoderUtils.java:96)
> at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:168)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.
> (MutationDetectors.java:118)
> at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
> (MutationDetectors.java:49)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
> (ImmutabilityCheckingBundleFactory.java:115)
> at
> 

Re: Query about autinference of numPartitions for `JdbcIO#readWithPartitions`

2024-05-31 Thread XQ Hu via user
You should be able to configure the number of partition like this:

https://github.com/GoogleCloudPlatform/dataflow-cookbook/blob/main/Java/src/main/java/jdbc/ReadPartitionsJdbc.java#L132

The code to  auto infer the number of partitions seems to be unreachable (I
haven't checked this carefully). More details are here:
https://issues.apache.org/jira/browse/BEAM-12456

On Fri, May 31, 2024 at 7:40 AM Vardhan Thigle via user <
user@beam.apache.org> wrote:

> Hi Beam Experts,I have a small query about `JdbcIO#readWithPartitions`
>
>
> ContextJdbcIO#readWithPartitions seems to always default
> to 200 partitions (DEFAULT_NUM_PARTITIONS). This is set by default when the
> object is constructed here
> 
> There seems to be no way to override this with a null value. Hence it
> seems that, the code
> 
>  that
> checks the null value and tries to auto infer the number of partitions
> based on the never runs.I am trying to use this for reading a tall table
> of unknown size, and the pipeline always defaults to 200 if the value is
> not set.  The default of 200 seems to fall short as worker goes out of
> memory in reshuffle stage. Running with higher number of partitions like 4K
> helps for my test setup.Since the size is not known at the time of
> implementing the pipeline, the auto-inference might help
> setting maxPartitions to a reasonable value as per the heuristic decided by
> Beam code.
> Request for help
>
> Could you please clarify a few doubts around this?
>
>1. Is this behavior intentional?
>2. Could you please explain the rationale behind the heuristic in L1398
>
> 
> and DEFAULT_NUM_PARTITIONS=200?
>
>
> I have also raised this as issues/31467 incase it needs any changes in
> the implementation.
>
>
> Regards and Thanks,
> Vardhan Thigle,
> +919535346204 <+91%2095353%2046204>
>


Re: Error handling for GCP Pub/Sub on Dataflow using Python

2024-05-25 Thread XQ Hu via user
I do not suggest you handle this in beam.io.WriteToPubSub. You could change
your pipeline to add one transform to check the message size. If it is
beyond 10 MB, you could use another sink or process the message to reduce
the size.

On Fri, May 24, 2024 at 3:46 AM Nimrod Shory  wrote:

> Hello group,
> I am pretty new to Dataflow and Beam.
> I have deployed a Dataflow streaming job using Beam with Python.
> The final step of my pipeline is publishing a message to Pub/Sub.
> In certain cases the message can become too big for Pub/Sub (larger than
> the allowed 10MB) and in that case of failure, it just retries to publish
> indefinitely, causing the Job to eventually stop processing new data.
>
> My question is, is there a way to handle failures in beam.io.WriteToPubSub
> or should I implement a similar method myself?
>
> Ideally, I would like to write the too large messages to a file on cloud
> storage.
>
> Any ideas will be appreciated.
>
> Thanks in advance for your help!
>
>


Re: Question: Java Apache Beam, mock external Clients initialized in Setup

2024-05-25 Thread XQ Hu via user
I am not sure which part you want to test. If the processData part should
be tested, you could refactor the code without use any Beam specific code
and test the processing data logic.

>From your example, it seems that you are calling some APIs, we recently
added a new Web API IO:
https://beam.apache.org/documentation/io/built-in/webapis/,
which provides a way to test.

On Wed, May 22, 2024 at 5:06 PM Ritwik Dutta via dev 
wrote:

> any response yet? No one has answers? I left a stackoverflow bounty on the
> question
>
> Using external methods is pretty important
>
> On Sunday, May 12, 2024 at 11:52:25 AM PDT, Ritwik Dutta <
> rdutt...@yahoo.com> wrote:
>
>
> Hi,
> I wrote the following question here.
> It would be really helpful also, if you can also update your documentation
> on Using Test Fakes in different Situations. It was very light
> documentation. Please provide more explanation and examples.
> https://beam.apache.org/documentation/io/testing/#:~:text=non%2DBeam%20client.-,Use%20fakes,-Instead%20of%20using
>
>
> *Question: *Java Apache Beam, mock external Clients initialized in @Setup
> method of DoFn with Constructors variables
>
> https://stackoverflow.com/questions/78468953/java-apache-beam-mock-external-clients-initialized-in-setup-method-of-dofn-wit
>
> Thanks,
>
> -Ritwik Dutta
>  734-262-4285 <(734)%20262-4285>
>


Re: Fails to deploy a python pipeline to a flink cluster

2024-05-11 Thread XQ Hu via user
Do you still have the same issue? I tried to follow your setup.sh to
reproduce this but somehow I am stuck at the word_len step. I saw you also
tried to use `print(kafka_kv)` to debug it. I am not sure about your
current status.

On Fri, May 10, 2024 at 9:18 AM Jaehyeon Kim  wrote:

> Hello,
>
> I'm playing with deploying a python pipeline to a flink cluster on
> kubernetes via flink kubernetes operator. The pipeline simply calculates
> average word lengths in a fixed time window of 5 seconds and it works with
> the embedded flink cluster.
>
> First, I created a k8s cluster (v1.25.3) on minikube and a docker image
> named beam-python-example:1.17 created using the following docker file -
> the full details can be checked in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/Dockerfile
>
> The java sdk is used for the sdk harness of the kafka io's expansion
> service while the job server is used to execute the python pipeline in the
> flink operator.
>
> FROM flink:1.17
> ...
> ## add java SDK and job server
> COPY --from=apache/beam_java8_sdk:2.56.0 /opt/apache/beam/
> /opt/apache/beam/
>
> COPY --from=apache/beam_flink1.17_job_server:2.56.0  \
>   /opt/apache/beam/jars/beam-runners-flink-job-server.jar
> /opt/apache/beam/jars/beam-runners-flink-job-server.jar
>
> RUN chown -R flink:flink /opt/apache/beam
>
> ## install python 3.10.13
> RUN apt-get update -y && \
>   apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
> libffi-dev liblzma-dev && \
>   wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-
> ${PYTHON_VERSION}.tgz && \
> ...
> ## install apache beam 2.56.0
> RUN pip3 install apache-beam==${BEAM_VERSION}
>
> ## copy pipeline source
> RUN mkdir /opt/flink/app
> COPY word_len.py /opt/flink/app/
>
> Then the pipeline is deployed using the following manifest - the full
> details can be found in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.yml
>
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: beam-word-len
> spec:
>   image: beam-python-example:1.17
>   imagePullPolicy: Never
>   flinkVersion: v1_17
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "5"
>   serviceAccount: flink
>   podTemplate:
> spec:
>   containers:
> - name: flink-main-container
>   env:
> - name: BOOTSTRAP_SERVERS
>   value: demo-cluster-kafka-bootstrap:9092
> ...
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> replicas: 2
> resource:
>   memory: "2048m"
>   cpu: 1
> podTemplate:
>   spec:
> containers:
>   - name: python-worker-harness
> image: apache/beam_python3.10_sdk:2.56.0
> imagePullPolicy: Never
> args: ["--worker_pool"]
> ports:
>   - containerPort: 5
>
>   job:
> jarURI:
> local:///opt/apache/beam/jars/beam-runners-flink-job-server.jar
> entryClass:
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint
> args:
>   - "--driver-cmd"
>   - "python /opt/flink/app/word_len.py --deploy"
> parallelism: 3
> upgradeMode: stateless
>
> Here is the pipeline source - the full details can be found in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.py
>
> When I add the --deploy flag, the python sdk harness is set to EXTERNAL
> and its config is set to localhost:5 - I believe it'll point to the
> side car container of the task manager. For the kafka io, the expansion
> service's sdk harness is configured as PROCESS and the command points to
> the java sdk that is added in the beam-python-example:1.17 image.
>
> ...
> def run(args=None):
> parser = argparse.ArgumentParser(description="Beam pipeline arguments"
> )
> parser.add_argument("--runner", default="FlinkRunner", help="Apache
> Beam runner")
> parser.add_argument(
> "--deploy",
> action="store_true",
> default="Flag to indicate whether to use an own local cluster",
> )
> opts, _ = parser.parse_known_args(args)
>
> pipeline_opts = {
> "runner": opts.runner,
> "job_name": "avg-word-length-beam",
> "streaming": True,
> "environment_type": "EXTERNAL" if opts.deploy is True else
> "LOOPBACK",
> "checkpointing_interval": "6",
> }
>
> expansion_service = None
> if pipeline_opts["environment_type"] == "EXTERNAL":
> pipeline_opts = {
> **pipeline_opts,
> **{
> "environment_config": "localhost:5",
> "flink_submit_uber_jar": True,
> },
> }
> expansion_service = kafka.default_io_expansion_service(
> append_args=[
> "--defaultEnvironmentType=PROCESS",
>
> '--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}',
> 

Re: Pipeline gets stuck when chaining two SDFs (Python SDK)

2024-05-05 Thread XQ Hu via user
I added this issue here
https://github.com/apache/beam/issues/24528#issuecomment-2095026324
But we do not plan to fix this for Python DirectRunner since Prism will
become the default local runner when it is ready.

On Sun, May 5, 2024 at 2:41 PM Jaehyeon Kim  wrote:

> Hi XQ
>
> Yes, it works with the FlinkRunner. Thank you so much!
>
> Cheers,
> Jaehyeon
>
> [image: image.png]
>
> On Mon, 6 May 2024 at 02:49, XQ Hu via user  wrote:
>
>> Have you tried to use other runners? I think this might be caused by some
>> gaps in Python DirectRunner to support the streaming cases or SDFs,
>>
>> On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim  wrote:
>>
>>> Hi XQ
>>>
>>> Thanks for checking it out. SDFs chaining seems to work as I created my
>>> pipeline while converting a pipeline that is built in the Java SDK. The
>>> source of the Java pipeline can be found in
>>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java
>>>
>>> So far, when I yield outputs, the second SDF gets stuck while it gets
>>> executed if I return them (but the first SDF completes). If I change the
>>> second SDF into a do function without adding the tracker, it is executed
>>> fine. Not sure what happens in the first scenario.
>>>
>>> Cheers,
>>> Jaehyeon
>>>
>>> On Sun, 5 May 2024 at 09:21, XQ Hu via user 
>>> wrote:
>>>
>>>> I played with your example. Indeed, create_tracker in
>>>> your ProcessFilesFn is never called, which is quite strange.
>>>> I could not find any example that shows the chained SDFs, which makes
>>>> me wonder whether the chained SDFs work.
>>>>
>>>> @Chamikara Jayalath  Any thoughts?
>>>>
>>>> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim  wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am building a pipeline using two SDFs that are chained. The first
>>>>> function (DirectoryWatchFn) checks a folder continuously and grabs if a 
>>>>> new
>>>>> file is added. The second one (ProcessFilesFn) processes a file
>>>>> while splitting each line - the processing simply prints the file name and
>>>>> line number.
>>>>>
>>>>> The process function of the first SDF gets stuck if I yield a new file
>>>>> object. Specifically, although the second SDF is called as I can check the
>>>>> initial restriction is created, the tracker is not created at all!
>>>>>
>>>>> On the other hand, if I return the file object list, the second SDF
>>>>> works fine but the issue is the first SDF stops as soon as it returns the
>>>>> first list of files.
>>>>>
>>>>> The source of the pipeline can be found in
>>>>> - First SDF:
>>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
>>>>> - Second SDF:
>>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
>>>>> - Pipeline:
>>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>>>>>
>>>>> Can you please inform me how to handle this issue?
>>>>>
>>>>> Cheers,
>>>>> Jaehyeon
>>>>>
>>>>> class DirectoryWatchFn(beam.DoFn):
>>>>> POLL_TIMEOUT = 10
>>>>>
>>>>> @beam.DoFn.unbounded_per_element()
>>>>> def process(
>>>>> self,
>>>>> element: str,
>>>>> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
>>>>> DirectoryWatchRestrictionProvider()
>>>>> ),
>>>>> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
>>>>> WatermarkEstimatorParam(
>>>>> DirectoryWatchWatermarkEstimatorProvider()
>>>>> ),
>>>>> ) -> typing.Iterable[MyFile]:
>>>>> new_files = self._get_new_files_if_any(element, tracker)
>>>>> if self._process_new_files(tracker, watermark_estimater,
>>>>> new_files):
>>>>> # return [new_file[0] for new_file 

Re: Pipeline gets stuck when chaining two SDFs (Python SDK)

2024-05-05 Thread XQ Hu via user
Have you tried to use other runners? I think this might be caused by some
gaps in Python DirectRunner to support the streaming cases or SDFs,

On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim  wrote:

> Hi XQ
>
> Thanks for checking it out. SDFs chaining seems to work as I created my
> pipeline while converting a pipeline that is built in the Java SDK. The
> source of the Java pipeline can be found in
> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java
>
> So far, when I yield outputs, the second SDF gets stuck while it gets
> executed if I return them (but the first SDF completes). If I change the
> second SDF into a do function without adding the tracker, it is executed
> fine. Not sure what happens in the first scenario.
>
> Cheers,
> Jaehyeon
>
> On Sun, 5 May 2024 at 09:21, XQ Hu via user  wrote:
>
>> I played with your example. Indeed, create_tracker in your ProcessFilesFn
>> is never called, which is quite strange.
>> I could not find any example that shows the chained SDFs, which makes me
>> wonder whether the chained SDFs work.
>>
>> @Chamikara Jayalath  Any thoughts?
>>
>> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim  wrote:
>>
>>> Hello,
>>>
>>> I am building a pipeline using two SDFs that are chained. The first
>>> function (DirectoryWatchFn) checks a folder continuously and grabs if a new
>>> file is added. The second one (ProcessFilesFn) processes a file
>>> while splitting each line - the processing simply prints the file name and
>>> line number.
>>>
>>> The process function of the first SDF gets stuck if I yield a new file
>>> object. Specifically, although the second SDF is called as I can check the
>>> initial restriction is created, the tracker is not created at all!
>>>
>>> On the other hand, if I return the file object list, the second SDF
>>> works fine but the issue is the first SDF stops as soon as it returns the
>>> first list of files.
>>>
>>> The source of the pipeline can be found in
>>> - First SDF:
>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
>>> - Second SDF:
>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
>>> - Pipeline:
>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>>>
>>> Can you please inform me how to handle this issue?
>>>
>>> Cheers,
>>> Jaehyeon
>>>
>>> class DirectoryWatchFn(beam.DoFn):
>>> POLL_TIMEOUT = 10
>>>
>>> @beam.DoFn.unbounded_per_element()
>>> def process(
>>> self,
>>> element: str,
>>> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
>>> DirectoryWatchRestrictionProvider()
>>> ),
>>> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
>>> WatermarkEstimatorParam(
>>> DirectoryWatchWatermarkEstimatorProvider()
>>> ),
>>> ) -> typing.Iterable[MyFile]:
>>> new_files = self._get_new_files_if_any(element, tracker)
>>> if self._process_new_files(tracker, watermark_estimater,
>>> new_files):
>>> # return [new_file[0] for new_file in new_files] #<-- it
>>> doesn't get stuck but the SDF finishes
>>> for new_file in new_files: #<--- it gets stuck if yielding
>>> file objects
>>> yield new_file[0]
>>> else:
>>> return
>>> tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>>>
>>> def _get_new_files_if_any(
>>> self, element: str, tracker: DirectoryWatchRestrictionTracker
>>> ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
>>> new_files = []
>>> for file in os.listdir(element):
>>> if (
>>> os.path.isfile(os.path.join(element, file))
>>> and file not in tracker.current_restriction().
>>> already_processed
>>> ):
>>> num_lines = sum(1 for _ in open(os.path.join(element,
>>> file)))
>>> new_file = MyFile(file, 0, num_lines)
>>> print(new_file)
>>> new_files.append

Re: Pipeline gets stuck when chaining two SDFs (Python SDK)

2024-05-04 Thread XQ Hu via user
I played with your example. Indeed, create_tracker in your ProcessFilesFn
is never called, which is quite strange.
I could not find any example that shows the chained SDFs, which makes me
wonder whether the chained SDFs work.

@Chamikara Jayalath  Any thoughts?

On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim  wrote:

> Hello,
>
> I am building a pipeline using two SDFs that are chained. The first
> function (DirectoryWatchFn) checks a folder continuously and grabs if a new
> file is added. The second one (ProcessFilesFn) processes a file
> while splitting each line - the processing simply prints the file name and
> line number.
>
> The process function of the first SDF gets stuck if I yield a new file
> object. Specifically, although the second SDF is called as I can check the
> initial restriction is created, the tracker is not created at all!
>
> On the other hand, if I return the file object list, the second SDF works
> fine but the issue is the first SDF stops as soon as it returns the first
> list of files.
>
> The source of the pipeline can be found in
> - First SDF:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
> - Second SDF:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
> - Pipeline:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>
> Can you please inform me how to handle this issue?
>
> Cheers,
> Jaehyeon
>
> class DirectoryWatchFn(beam.DoFn):
> POLL_TIMEOUT = 10
>
> @beam.DoFn.unbounded_per_element()
> def process(
> self,
> element: str,
> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
> DirectoryWatchRestrictionProvider()
> ),
> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
> WatermarkEstimatorParam(
> DirectoryWatchWatermarkEstimatorProvider()
> ),
> ) -> typing.Iterable[MyFile]:
> new_files = self._get_new_files_if_any(element, tracker)
> if self._process_new_files(tracker, watermark_estimater, new_files
> ):
> # return [new_file[0] for new_file in new_files] #<-- it
> doesn't get stuck but the SDF finishes
> for new_file in new_files: #<--- it gets stuck if yielding
> file objects
> yield new_file[0]
> else:
> return
> tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>
> def _get_new_files_if_any(
> self, element: str, tracker: DirectoryWatchRestrictionTracker
> ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
> new_files = []
> for file in os.listdir(element):
> if (
> os.path.isfile(os.path.join(element, file))
> and file not in tracker.current_restriction().
> already_processed
> ):
> num_lines = sum(1 for _ in open(os.path.join(element, file
> )))
> new_file = MyFile(file, 0, num_lines)
> print(new_file)
> new_files.append(
> (
> new_file,
> Timestamp.of(os.path.getmtime(os.path.join(element,
> file))),
> )
> )
> return new_files
>
> def _process_new_files(
> self,
> tracker: DirectoryWatchRestrictionTracker,
> watermark_estimater: ManualWatermarkEstimator,
> new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
> ):
> max_instance = watermark_estimater.current_watermark()
> for new_file in new_files:
> if tracker.try_claim(new_file[0].name) is False:
> watermark_estimater.set_watermark(max_instance)
> return False
> if max_instance < new_file[1]:
> max_instance = new_file[1]
> watermark_estimater.set_watermark(max_instance)
> return max_instance < MAX_TIMESTAMP
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread XQ Hu via user
I am not sure you still need to do batching since Web API can handle
caching.

If you really need it, I think GoupIntoBatches is a good way to go.

On Mon, Apr 15, 2024 at 11:30 AM Ruben Vargas 
wrote:

> Is there a way to do batching in that transformation? I'm assuming for
> now no. or may be using in conjuntion with GoupIntoBatches
>
> On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas 
> wrote:
> >
> > Interesting
> >
> > I think the cache feature could be interesting for some use cases I have.
> >
> > On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
> > >
> > > For the new web API IO, the page lists these features:
> > >
> > > developers provide minimal code that invokes Web API endpoint
> > > delegate to the transform to handle request retries and exponential
> backoff
> > > optional caching of request and response associations
> > > optional metrics
> > >
> > >
> > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas 
> wrote:
> > >>
> > >> That one looks interesting
> > >>
> > >> What is not clear to me is what are the advantages of using it? Is
> > >> only the error/retry handling? anything in terms of performance?
> > >>
> > >> My PCollection is unbounded but I was thinking of sending my messages
> > >> in batches to the external API in order to gain some performance
> > >> (don't expect to send 1 http request per message).
> > >>
> > >> Thank you very much for all your responses!
> > >>
> > >>
> > >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user 
> wrote:
> > >> >
> > >> > To enrich your data, have you checked
> https://cloud.google.com/dataflow/docs/guides/enrichment?
> > >> >
> > >> > This transform is built on top of
> https://beam.apache.org/documentation/io/built-in/webapis/
> > >> >
> > >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas <
> ruben.var...@metova.com> wrote:
> > >> >>
> > >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim 
> wrote:
> > >> >> >
> > >> >> > Here is an example from a book that I'm reading now and it may
> be applicable.
> > >> >> >
> > >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> > >> >> > PYTHON - ord(id[0]) % 100
> > >> >>
> > >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
> > >> >>
> > >> >> >
> > >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian <
> ged1...@gmail.com> wrote:
> > >> >> >>
> > >> >> >> How about just keeping track of a buffer and flush the buffer
> after 100 messages and if there is a buffer on finish_bundle as well?
> > >> >> >>
> > >> >> >>
> > >> >>
> > >> >> If this is in memory, It could lead to potential loss of data.
> That is
> > >> >> why the state is used or at least that is my understanding. but
> maybe
> > >> >> there is a way to do this in the state?
> > >> >>
> > >> >>
> > >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas <
> ruben.var...@metova.com> wrote:
> > >> >> >>>
> > >> >> >>> Hello guys
> > >> >> >>>
> > >> >> >>> Maybe this question was already answered, but I cannot find
> it  and
> > >> >> >>> want some more input on this topic.
> > >> >> >>>
> > >> >> >>> I have some messages that don't have any particular key
> candidate,
> > >> >> >>> except the ID,  but I don't want to use it because the idea is
> to
> > >> >> >>> group multiple IDs in the same batch.
> > >> >> >>>
> > >> >> >>> This is my use case:
> > >> >> >>>
> > >> >> >>> I have an endpoint where I'm gonna send the message ID, this
> endpoint
> > >> >> >>> is gonna return me certain information which I will use to
> enrich my
> > >> >> >>> message. In order to avoid fetching the endpoint per message I
> want to
> > >> >> >>> batch it in 100 and send the 100 IDs in one request ( the
> endpoint
> > >> >> >>> supports it) . I was thinking on using GroupIntoBatches.
> > >> >> >>>
> > >> >> >>> - If I choose the ID as the key, my understanding is that it
> won't
> > >> >> >>> work in the way I want (because it will form batches of the
> same ID).
> > >> >> >>> - Use a constant will be a problem for parallelism, is that
> correct?
> > >> >> >>>
> > >> >> >>> Then my question is, what should I use as a key? Maybe
> something
> > >> >> >>> regarding the timestamp? so I can have groups of messages that
> arrive
> > >> >> >>> at a certain second?
> > >> >> >>>
> > >> >> >>> Any suggestions would be appreciated
> > >> >> >>>
> > >> >> >>> Thanks.
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread XQ Hu via user
For the new web API IO, the page lists these features:

   - developers provide minimal code that invokes Web API endpoint
   - delegate to the transform to handle request retries and exponential
   backoff
   - optional caching of request and response associations
   - optional metrics


On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas 
wrote:

> That one looks interesting
>
> What is not clear to me is what are the advantages of using it? Is
> only the error/retry handling? anything in terms of performance?
>
> My PCollection is unbounded but I was thinking of sending my messages
> in batches to the external API in order to gain some performance
> (don't expect to send 1 http request per message).
>
> Thank you very much for all your responses!
>
>
> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user 
> wrote:
> >
> > To enrich your data, have you checked
> https://cloud.google.com/dataflow/docs/guides/enrichment?
> >
> > This transform is built on top of
> https://beam.apache.org/documentation/io/built-in/webapis/
> >
> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas 
> wrote:
> >>
> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >> >
> >> > Here is an example from a book that I'm reading now and it may be
> applicable.
> >> >
> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> >> > PYTHON - ord(id[0]) % 100
> >>
> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
> >>
> >> >
> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
> wrote:
> >> >>
> >> >> How about just keeping track of a buffer and flush the buffer after
> 100 messages and if there is a buffer on finish_bundle as well?
> >> >>
> >> >>
> >>
> >> If this is in memory, It could lead to potential loss of data. That is
> >> why the state is used or at least that is my understanding. but maybe
> >> there is a way to do this in the state?
> >>
> >>
> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
> >> >>>
> >> >>> Hello guys
> >> >>>
> >> >>> Maybe this question was already answered, but I cannot find it  and
> >> >>> want some more input on this topic.
> >> >>>
> >> >>> I have some messages that don't have any particular key candidate,
> >> >>> except the ID,  but I don't want to use it because the idea is to
> >> >>> group multiple IDs in the same batch.
> >> >>>
> >> >>> This is my use case:
> >> >>>
> >> >>> I have an endpoint where I'm gonna send the message ID, this
> endpoint
> >> >>> is gonna return me certain information which I will use to enrich my
> >> >>> message. In order to avoid fetching the endpoint per message I want
> to
> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >> >>> supports it) . I was thinking on using GroupIntoBatches.
> >> >>>
> >> >>> - If I choose the ID as the key, my understanding is that it won't
> >> >>> work in the way I want (because it will form batches of the same
> ID).
> >> >>> - Use a constant will be a problem for parallelism, is that correct?
> >> >>>
> >> >>> Then my question is, what should I use as a key? Maybe something
> >> >>> regarding the timestamp? so I can have groups of messages that
> arrive
> >> >>> at a certain second?
> >> >>>
> >> >>> Any suggestions would be appreciated
> >> >>>
> >> >>> Thanks.
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-14 Thread XQ Hu via user
To enrich your data, have you checked
https://cloud.google.com/dataflow/docs/guides/enrichment?

This transform is built on top of
https://beam.apache.org/documentation/io/built-in/webapis/

On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas 
wrote:

> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >
> > Here is an example from a book that I'm reading now and it may be
> applicable.
> >
> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> > PYTHON - ord(id[0]) % 100
>
> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>
> >
> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
> wrote:
> >>
> >> How about just keeping track of a buffer and flush the buffer after 100
> messages and if there is a buffer on finish_bundle as well?
> >>
> >>
>
> If this is in memory, It could lead to potential loss of data. That is
> why the state is used or at least that is my understanding. but maybe
> there is a way to do this in the state?
>
>
> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
> >>>
> >>> Hello guys
> >>>
> >>> Maybe this question was already answered, but I cannot find it  and
> >>> want some more input on this topic.
> >>>
> >>> I have some messages that don't have any particular key candidate,
> >>> except the ID,  but I don't want to use it because the idea is to
> >>> group multiple IDs in the same batch.
> >>>
> >>> This is my use case:
> >>>
> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
> >>> is gonna return me certain information which I will use to enrich my
> >>> message. In order to avoid fetching the endpoint per message I want to
> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >>> supports it) . I was thinking on using GroupIntoBatches.
> >>>
> >>> - If I choose the ID as the key, my understanding is that it won't
> >>> work in the way I want (because it will form batches of the same ID).
> >>> - Use a constant will be a problem for parallelism, is that correct?
> >>>
> >>> Then my question is, what should I use as a key? Maybe something
> >>> regarding the timestamp? so I can have groups of messages that arrive
> >>> at a certain second?
> >>>
> >>> Any suggestions would be appreciated
> >>>
> >>> Thanks.
>


Re: Is Pulsar IO Connector Officially Supported?

2024-04-10 Thread XQ Hu via user
Sounds like a good idea to add a new section. Let me chat with the team
about that. Thanks.

On Wed, Apr 10, 2024 at 12:09 PM Ahmet Altay  wrote:

> Pulsar IO did not change much since it was originally added in 2022. You
> can find about the gaps in this presentation (
> https://2022.beamsummit.org/slides/Developing%20PulsarIO%20Connector.pdf)
> starting slide 52 (next steps). That might give you the background
> information to make an informed decision on whether it will be suitable for
> your use case or not.
>
> XQ -- For the connectors page, would it make sense to add a section for IO
> connectors that are in progress (not completely ready, but something is
> available) for visibility purposes?
>
> On Wed, Apr 10, 2024 at 8:58 AM Vince Castello via user <
> user@beam.apache.org> wrote:
>
>> I see that the Beam 2.38.0 added support for reading and writing topics.
>> Based on your response, does this mean that we should not use the Pulsar IO
>> connector for production use? Let me know.
>>
>> Thanks
>>
>> --Vince
>>
>>
>> On Wed, Apr 10, 2024 at 9:58 AM XQ Hu  wrote:
>>
>>> I think PulsarIO needs more work to be polished.
>>>
>>> On Tue, Apr 9, 2024 at 2:48 PM Vince Castello via user <
>>> user@beam.apache.org> wrote:
>>>
 I see that a Pulsar connector was made available as of BEAM 2.38.0
 release but I don't see Pulsar as an official connector on the page below.
 Is the Pulsar IO connector official or not? If official then can someone
 please update the page since it gives the idea that a Pulsar IO connector
 is not available.


 https://beam.apache.org/documentation/io/connectors/


 Thanks


 --Vince






Re: Is Pulsar IO Connector Officially Supported?

2024-04-10 Thread XQ Hu via user
I think PulsarIO needs more work to be polished.

On Tue, Apr 9, 2024 at 2:48 PM Vince Castello via user 
wrote:

> I see that a Pulsar connector was made available as of BEAM 2.38.0 release
> but I don't see Pulsar as an official connector on the page below. Is the
> Pulsar IO connector official or not? If official then can someone please
> update the page since it gives the idea that a Pulsar IO connector is not
> available.
>
>
> https://beam.apache.org/documentation/io/connectors/
>
>
> Thanks
>
>
> --Vince
>
>
>
>


Re: how to enable debugging mode for python worker harness

2024-03-31 Thread XQ Hu via user
That is strange. I did nothing special but cloned your repo and then:
1. docker-compose -f docker-compose.yaml up
2. I just ran both ways for a simple t.py test, which works well
t.py:

import apache_beam as beam

with beam.Pipeline() as p:

_ = (
p
| beam.Create(
[(0, "ttt"), (0, "ttt1"), (0, "ttt2"), (1, "xxx"), (1, "xxx2"),
(2, "yyy")]
)
| beam.Map(print)
)

python t.py \
  --topic test --group test-group --bootstrap-server
host.docker.internal:9092 \
  --job_endpoint host.docker.internal:8099 \
  --artifact_endpoint host.docker.internal:8098 \
  --environment_type=EXTERNAL \
  --environment_config=host.docker.internal:5

python t.py \
  --topic test --group test-group --bootstrap-server localhost:9092 \
  --job_endpoint localhost:8099 \
  --artifact_endpoint localhost:8098 \
  --environment_type=EXTERNAL \
  --environment_config=localhost:5

outputs:

(0, 'ttt')
(0, 'ttt1')
(0, 'ttt2')
(1, 'xxx')
(1, 'xxx2')
(2, 'yyy')


On Sun, Mar 31, 2024 at 7:15 PM Lydian Lee  wrote:

> Hi XQ,
>
> Sorry to bother you again, but I've tested the same thing again in a linux
> env, and it is still not working and showing the same error in the python
> worker harness.  (Note that this won't fail immediately, but it is failing
> after the task is assigned to task manager and the python worker harness is
> starting to work)
>
> Wondering if you can share what you've changed  (maybe a PR) so that I can
> test again on my linux machine. Thanks so much for your help.  There's
> someone else also pinging me on the same error when testing, and I do want
> to make this work for everyone.   Thanks!
>
>
>
> On Mon, Mar 18, 2024 at 6:24 PM XQ Hu via user 
> wrote:
>
>> I did not do anything special but ran `docker-compose -f
>> docker-compose.yaml up` from your repo.
>>
>> On Sun, Mar 17, 2024 at 11:38 PM Lydian Lee 
>> wrote:
>>
>>> Hi XQ,
>>>
>>> The code is simplified from my previous work and thus it is still using
>>> the old version. But I've tested with Beam 2.54.0 and the code still works
>>> (I mean using my company's image.)  If this is running well in your linux,
>>> I guess there could be something related to how I build the docker image.
>>> Curious if you could share the image you built to docker.io so that I
>>> can confirm if the problem is related to only the image, thanks.
>>>
>>> The goal for this repo is to complete my previous talk:
>>> https://www.youtube.com/watch?v=XUz90LpGAgc_channel=ApacheBeam
>>>
>>> On Sun, Mar 17, 2024 at 8:07 AM XQ Hu via user 
>>> wrote:
>>>
>>>> I cloned your repo on my Linux machine, which is super useful to run.
>>>> Not sure why you use Beam 2.41 but anyway, I tried this on my Linux 
>>>> machine:
>>>>
>>>> python t.py \
>>>>   --topic test --group test-group --bootstrap-server localhost:9092 \
>>>>   --job_endpoint localhost:8099 \
>>>>   --artifact_endpoint localhost:8098 \
>>>>   --environment_type=EXTERNAL \
>>>>   --environment_config=localhost:5
>>>>
>>>> Note I replaced host.docker.internal with localhost and it runs well.
>>>>
>>>> I then tried to use host.docker.internal and it also runs well,
>>>>
>>>> Maybe this is related to your Mac setting?
>>>>
>>>> On Sun, Mar 17, 2024 at 8:34 AM Lydian Lee 
>>>> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> Just FYI, the similar things works on a different image with the one I
>>>>> built using my company’s image as base image. I’ve only replaced the base
>>>>> image with ubuntu. But given that the error log is completely not helpful,
>>>>> it’s really hard for me to continue debugging on the issue though.
>>>>>
>>>>> The docker is not required on my base image as I’ve already add extra
>>>>> args to ReadFromKafka with default environment to be Process. This is 
>>>>> proof
>>>>> to work with my company’s docker image. For the host.internal.docker which
>>>>> is also supported by docker for mac. The only thing i need to do is to
>>>>> configure /etc/hosts so that i can submit the job directly from the laptop
>>>>> and not the flink master.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Mar 17, 2024 at 2:40 AM Jaehyeon Kim 
>>>>> wrote:
>>>&g

Re: DLQ Implementation

2024-03-27 Thread XQ Hu via user
You can check
https://github.com/search?q=repo%3Ajohnjcasey%2Fbeam%20withBadRecordErrorHandler=code.
The test codes show how to use them. More doc will be added later.

On Wed, Mar 27, 2024 at 7:15 PM Ruben Vargas 
wrote:

> Hello all
>
> Maybe a silly question. Are there any  suggestions for implementing a DLQ
> in my beam pipeline?
>
> Currently I'm using this library https://github.com/tosun-si/asgarde
> which is not bad, the only issue I found  is that sometimes it is hard  to
> use with GroupIntoBatches or other native transformations.
>
> Then I saw this PR https://github.com/apache/beam/pull/29164/files which
> I think is some sort of DLQ support? but I cannot find something on the
> docs for that and I am not 100% familiar with all the beam code to
> understand very well.
>
> Appreciated your help
>
>
> Thank you
> -Ruben
>


Re: What's the current status of pattern matching with Beam SQL?

2024-03-24 Thread XQ Hu via user
I do not think anyone plans to work on these issues in the near future.

On Sun, Mar 24, 2024 at 6:59 PM Jaehyeon Kim  wrote:

> Hi XQ
>
> Thanks for your reply. I had a quick check and there seem to be two
> streams of discussions.
>
> 1. (BEAM-9543) BeamSQL Pattern Recognization Functionality -
> https://issues.apache.org/jira/browse/BEAM-9543
>
> I see the code is merged but the ticket indicates it is partially
> supported due to the time limit for Google Summer of Code 2020. It also
> indicates full functionality would be via constructing a CEP (Complex Event
> Processing) library, which is discussed in BEAM- 3767 .
>
> 2. (BEAM-3767) A Complex Event Processing (CEP) library/extension for
> Apache Beam - https://issues.apache.org/jira/browse/BEAM-3767
>
> The goal is to implement an efficient pattern matching library inspired by
> eg) Apache Flink CEP.
>
> Both the tickets have an associating GitHub issue and no update for more
> than 1 year, which means they are not likely to be completed in the near
> future?
>
> Cheers,
> Jaehyeon
>
>
> On Sun, 24 Mar 2024 at 12:02, XQ Hu via user  wrote:
>
>> https://beam.apache.org/documentation/dsls/sql/zetasql/overview/
>> and
>>
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
>> for the supported functions.
>>
>> On Sat, Mar 23, 2024 at 5:50 AM Jaehyeon Kim  wrote:
>>
>>> Hello,
>>>
>>> I found a blog article about pattern matching with Beam SQL -
>>> https://beam.apache.org/blog/pattern-match-beam-sql/. All of the PRs
>>> and commits that are included in the post are merged.
>>>
>>> On the other hand, the Beam Calcite SQL overview page indicates
>>> MATCH_RECOGNIZE is not supported (
>>> https://beam.apache.org/documentation/dsls/sql/calcite/overview/). I
>>> haven't found a section whether ZetaSQL supports it or not.
>>>
>>> Can you please inform me if it is supported and relevant resources if so?
>>>
>>> Cheers,
>>> Jaehyeon
>>>
>>


Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-23 Thread XQ Hu via user
My attempt to fix https://github.com/apache/beam/issues/25598:
https://github.com/apache/beam/pull/30728

On Thu, Mar 21, 2024 at 10:35 AM Ondřej Pánek 
wrote:

> Hello Jan,
>
>
>
> thanks a lot for for the detailed answer! So during the last week, the
> consumer changed their requirements on the output to BigQuery. That, as we
> understand, is available even in Beam Python SDK, and we have already a PoC
> for it.
>
>
>
> You have a very good point with the bullet c). That’s indeed our case now.
> The data will really be only transferred from topics on the Kafka
> MirrorMaker cluster (managed by the customer) sitting in the GCP to
> BigQuery, rather than performing some huge Transformations. However, the
> number of topics is quite large (hundreds), and the customer wants to have
> additional flexibility when adding/removing topics so the transfer job will
> dynamically take the changes.
>
>
>
> TBH we also started to think about PySpark Streaming in Dataproc and
> created some PoC there as well. Looks more light weight than Dataflow &
> Beam for the initial runs.
>
>
>
> Also, and you definitely will know better, looks like the offset
> management in Beam/Dataflow for streaming is a bit of a “black box”
> compared to the external storage for the offsets in Spark Streaming. The
> problem I’m having with Dataflow now, is that after the job’s removal, the
> internal state, hence offsets, is reset, and one needs to make sure
> (somehow?) the consumed data is not duplicated/lost in case of Dataflow job
> restart.
>
>
>
> Another Kafka cluster with Kafka Connect is not really an option, again
> based on the customer’s requirements. The Data Engineering team wants to
> have the full control on this ingestion solution, and Kafka cluster
> management is not in their scope, what’s more neither is Java in general.
>
>
>
> Thanks for the answers and opinions so far!
>
>
>
> Best,
>
>
>
> Ondřej
>
>
>
>
>
>
>
> *From: *Jan Lukavský 
> *Date: *Thursday, March 14, 2024 at 14:13
> *To: *user@beam.apache.org 
> *Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python
>
> Self-correction, as you are using a streaming pipeline without final
> watermark emission (I suppose), option (a) will not work. Patching the sink
> to support generic windowing would be probably much more involved.
>
> On 3/14/24 14:07, Jan Lukavský wrote:
>
> Hi Ondřej,
>
> I'll start with a disclaimer; I'm not exactly an expert on neither python
> SDK nor ParquetIO, so please take these just as a suggestions from the top
> of my head.
>
> First, it seems that the current implementation of WriteToParquet really
> does not play well with streaming pipelines. There are several options that
> could be used to overcome this limitation:
>
>  a) you can try fixing the sink, maybe adding
> AfterWatermark.pastEndOfWindow() trigger might be enough to make it work
> (need to be tested)
>
>  b) if the Java implementation of ParquetIO works for streaming pipelines
> (and I would suppose it does), you can use cross-language transform to run
> ParquetIO from python, see [1] for quick start
>
>  c) generally speaking, using a full-blown streaming engine for tasks like
> "buffer this and store it in bulk after a timeout" is inefficient.
> Alternative approach would be just to use KafkaConsumer, create parquet
> files on local disk, push them to GCS and commit offsets afterwards.
> Streaming engines buffer data in replicated distributed state which adds
> unneeded complexity
>
>  d) if there is some non-trivial processing between consuming elements
> from Kafka and writing outputs, then it might be an alternative to process
> the data in streaming pipeline, write outputs back to Kafka and then use
> approach (c) to get it to GCS
>
> The specific solution depends on the actual requirements of your customers.
>
> Best,
>
>  Jan
>
> [1]
> https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/
>
> On 3/14/24 09:34, Ondřej Pánek wrote:
>
> Basically, this is the error we receive when trying to use avro or parquet
> sinks (attached image).
>
> Also, check the sample pipeline that triggers this error (when deploying
> with DataflowRunner). So obviously, there is no global window or default
> trigger. That’s, I believe, what’s described in the issue:
> https://github.com/apache/beam/issues/25598
>
>
>
>
>
> *From: *Ondřej Pánek  
> *Date: *Thursday, March 14, 2024 at 07:57
> *To: *user@beam.apache.org  
> *Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python
>
> Hello, thanks for the reply!
>
> Please, refer to these:

Re: What's the current status of pattern matching with Beam SQL?

2024-03-23 Thread XQ Hu via user
https://beam.apache.org/documentation/dsls/sql/zetasql/overview/
and
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
for the supported functions.

On Sat, Mar 23, 2024 at 5:50 AM Jaehyeon Kim  wrote:

> Hello,
>
> I found a blog article about pattern matching with Beam SQL -
> https://beam.apache.org/blog/pattern-match-beam-sql/. All of the PRs and
> commits that are included in the post are merged.
>
> On the other hand, the Beam Calcite SQL overview page indicates
> MATCH_RECOGNIZE is not supported (
> https://beam.apache.org/documentation/dsls/sql/calcite/overview/). I
> haven't found a section whether ZetaSQL supports it or not.
>
> Can you please inform me if it is supported and relevant resources if so?
>
> Cheers,
> Jaehyeon
>


Re: how to enable debugging mode for python worker harness

2024-03-18 Thread XQ Hu via user
I did not do anything special but ran `docker-compose -f
docker-compose.yaml up` from your repo.

On Sun, Mar 17, 2024 at 11:38 PM Lydian Lee  wrote:

> Hi XQ,
>
> The code is simplified from my previous work and thus it is still using
> the old version. But I've tested with Beam 2.54.0 and the code still works
> (I mean using my company's image.)  If this is running well in your linux,
> I guess there could be something related to how I build the docker image.
> Curious if you could share the image you built to docker.io so that I can
> confirm if the problem is related to only the image, thanks.
>
> The goal for this repo is to complete my previous talk:
> https://www.youtube.com/watch?v=XUz90LpGAgc_channel=ApacheBeam
>
> On Sun, Mar 17, 2024 at 8:07 AM XQ Hu via user 
> wrote:
>
>> I cloned your repo on my Linux machine, which is super useful to run. Not
>> sure why you use Beam 2.41 but anyway, I tried this on my Linux machine:
>>
>> python t.py \
>>   --topic test --group test-group --bootstrap-server localhost:9092 \
>>   --job_endpoint localhost:8099 \
>>   --artifact_endpoint localhost:8098 \
>>   --environment_type=EXTERNAL \
>>   --environment_config=localhost:5
>>
>> Note I replaced host.docker.internal with localhost and it runs well.
>>
>> I then tried to use host.docker.internal and it also runs well,
>>
>> Maybe this is related to your Mac setting?
>>
>> On Sun, Mar 17, 2024 at 8:34 AM Lydian Lee 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> Just FYI, the similar things works on a different image with the one I
>>> built using my company’s image as base image. I’ve only replaced the base
>>> image with ubuntu. But given that the error log is completely not helpful,
>>> it’s really hard for me to continue debugging on the issue though.
>>>
>>> The docker is not required on my base image as I’ve already add extra
>>> args to ReadFromKafka with default environment to be Process. This is proof
>>> to work with my company’s docker image. For the host.internal.docker which
>>> is also supported by docker for mac. The only thing i need to do is to
>>> configure /etc/hosts so that i can submit the job directly from the laptop
>>> and not the flink master.
>>>
>>>
>>>
>>>
>>> On Sun, Mar 17, 2024 at 2:40 AM Jaehyeon Kim  wrote:
>>>
>>>> Hello,
>>>>
>>>> The pipeline runs in host while host.docker.internal would only be
>>>> resolved on the containers that run with the host network mode. I guess the
>>>> pipeline wouldn't be accessible to host.docker.internal and fails to run.
>>>>
>>>> If everything before ReadFromKafka works successfully, a docker
>>>> container will be launched with the host network mode so that
>>>> host.docker.internal:9092 can be resolved inside the container. As far as
>>>> I've checked, however, it fails when I start a flink cluster on docker and
>>>> I had to rely on a local flink cluster. If you'd like to try to use docker,
>>>> you should have docker installed on your custom docker image and
>>>> volume-map /var/run/docker.sock to the flink task manager. Otherwise, it
>>>> won't be able to launch a Docker container for reading kafka messages.
>>>>
>>>> Cheers,
>>>> Jaehyeon
>>>>
>>>>
>>>> On Sun, 17 Mar 2024 at 18:21, Lydian Lee 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have an issue when setting up a POC of  Python SDK with Flink runner
>>>>> to run in docker-compose.  The python worker harness was not returning any
>>>>> error but:
>>>>> ```
>>>>> python-worker-harness-1  | 2024/03/17 07:10:17 Executing: python -m
>>>>> apache_beam.runners.worker.sdk_worker_main
>>>>> python-worker-harness-1  | 2024/03/17 07:10:24 Python exited: 
>>>>> ```
>>>>> and dead.  The error message seems totally unuseful, and I am
>>>>> wondering if there's a way to make the harness script show more debug
>>>>> logging.
>>>>>
>>>>> I started my harness via:
>>>>> ```
>>>>> /opt/apache/beam/boot --worker_pool
>>>>> ```
>>>>> and configure my script to use the harness
>>>>> ```
>>>>> python docker/src/example.py \
>>>>>   --topic test --group test-group --bootstrap-server
>>>>> host.docker.internal:9092 \
>>>>>   --job_endpoint host.docker.internal:8099 \
>>>>>   --artifact_endpoint host.docker.internal:8098 \
>>>>>   --environment_type=EXTERNAL \
>>>>>   --environment_config=host.docker.internal:5
>>>>> ```
>>>>> The full settings is available in:
>>>>> https://github.com/lydian/beam-python-flink-runner-examples
>>>>> Thanks for your help
>>>>>
>>>>>


Re: how to enable debugging mode for python worker harness

2024-03-17 Thread XQ Hu via user
I cloned your repo on my Linux machine, which is super useful to run. Not
sure why you use Beam 2.41 but anyway, I tried this on my Linux machine:

python t.py \
  --topic test --group test-group --bootstrap-server localhost:9092 \
  --job_endpoint localhost:8099 \
  --artifact_endpoint localhost:8098 \
  --environment_type=EXTERNAL \
  --environment_config=localhost:5

Note I replaced host.docker.internal with localhost and it runs well.

I then tried to use host.docker.internal and it also runs well,

Maybe this is related to your Mac setting?

On Sun, Mar 17, 2024 at 8:34 AM Lydian Lee  wrote:

>
> Hi,
>
> Just FYI, the similar things works on a different image with the one I
> built using my company’s image as base image. I’ve only replaced the base
> image with ubuntu. But given that the error log is completely not helpful,
> it’s really hard for me to continue debugging on the issue though.
>
> The docker is not required on my base image as I’ve already add extra args
> to ReadFromKafka with default environment to be Process. This is proof to
> work with my company’s docker image. For the host.internal.docker which is
> also supported by docker for mac. The only thing i need to do is to
> configure /etc/hosts so that i can submit the job directly from the laptop
> and not the flink master.
>
>
>
>
> On Sun, Mar 17, 2024 at 2:40 AM Jaehyeon Kim  wrote:
>
>> Hello,
>>
>> The pipeline runs in host while host.docker.internal would only be
>> resolved on the containers that run with the host network mode. I guess the
>> pipeline wouldn't be accessible to host.docker.internal and fails to run.
>>
>> If everything before ReadFromKafka works successfully, a docker container
>> will be launched with the host network mode so that
>> host.docker.internal:9092 can be resolved inside the container. As far as
>> I've checked, however, it fails when I start a flink cluster on docker and
>> I had to rely on a local flink cluster. If you'd like to try to use docker,
>> you should have docker installed on your custom docker image and
>> volume-map /var/run/docker.sock to the flink task manager. Otherwise, it
>> won't be able to launch a Docker container for reading kafka messages.
>>
>> Cheers,
>> Jaehyeon
>>
>>
>> On Sun, 17 Mar 2024 at 18:21, Lydian Lee  wrote:
>>
>>> Hi,
>>>
>>> I have an issue when setting up a POC of  Python SDK with Flink runner
>>> to run in docker-compose.  The python worker harness was not returning any
>>> error but:
>>> ```
>>> python-worker-harness-1  | 2024/03/17 07:10:17 Executing: python -m
>>> apache_beam.runners.worker.sdk_worker_main
>>> python-worker-harness-1  | 2024/03/17 07:10:24 Python exited: 
>>> ```
>>> and dead.  The error message seems totally unuseful, and I am wondering
>>> if there's a way to make the harness script show more debug logging.
>>>
>>> I started my harness via:
>>> ```
>>> /opt/apache/beam/boot --worker_pool
>>> ```
>>> and configure my script to use the harness
>>> ```
>>> python docker/src/example.py \
>>>   --topic test --group test-group --bootstrap-server
>>> host.docker.internal:9092 \
>>>   --job_endpoint host.docker.internal:8099 \
>>>   --artifact_endpoint host.docker.internal:8098 \
>>>   --environment_type=EXTERNAL \
>>>   --environment_config=host.docker.internal:5
>>> ```
>>> The full settings is available in:
>>> https://github.com/lydian/beam-python-flink-runner-examples
>>> Thanks for your help
>>>
>>>


Re: java.lang.ClassCastException: class java.lang.String cannot be cast to class...

2024-03-17 Thread XQ Hu via user
Here is what I did including how I setup the portable runner with Flink

1. Start the local Flink cluster
2. Start the Flink job server and point to that local cluster: docker run
--net=host apache/beam_flink1.16_job_server:latest
--flink-master=localhost:8081
3. I use these pipeline options in the code: options =
PipelineOptions(parallelism=1, environment_type="LOOPBACK", job_endpoint=
"localhost:8099", streaming=True)
4. The key I think is to explicitly specify the output types for TestStream
like this: TestStream(coder=coders.StrUtf8Coder())*.with_output_types(str)*

These at least work for me.

On Thu, Mar 14, 2024 at 4:37 PM Jaehyeon Kim  wrote:

> Hello,
>
> I am trying a simple word count pipeline in a streaming environment using
> TestStream (Python SDK). While it works with the DirectRunner, it fails on
> the FlinkRunner with the following error. It looks like a type casting
> issue.
>
> Traceback (most recent call last):
>   File
> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
> line 78, in 
> run()
>   File
> "/home/jaehyeon/projects/general-demos/beam-pipelines/chapter1/first_streaming_pipeline.py",
> line 74, in run
> p.run().wait_until_finish()
>   File
> "/home/jaehyeon/projects/general-demos/venv/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 576, in wait_until_finish
> raise self._runtime_exception
> RuntimeError: Pipeline
> BeamApp-jaehyeon-0314203421-dfc96365_ba750d30-ff27-439d-a6ad-ce835f88fdf7
> failed in state FAILED: java.lang.ClassCastException: class
> java.lang.String cannot be cast to class [B (java.lang.String and [B are in
> module java.base of loader 'bootstrap')
>
> Can you please inform me how to fix it? Below shows the pipeline code.
>
> Cheers,
> Jaehyeon
>
> import os
> import datetime
> import argparse
> import logging
> import re
>
> import apache_beam as beam
> from apache_beam.coders import coders
> from apache_beam.transforms import window
> from apache_beam.transforms.trigger import AfterWatermark,
> AccumulationMode
> from apache_beam.testing.test_stream import TestStream
> from apache_beam.transforms.window import TimestampedValue
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import StandardOptions
>
>
> def read_file(filename: str, inputpath: str):
> with open(os.path.join(inputpath, filename), "r") as f:
> return f.readlines()
>
>
> def tokenize(element: str):
> return re.findall(r"[A-Za-z\']+", element)
>
>
> def run():
> parser = argparse.ArgumentParser(description="Beam pipeline arguments"
> )
> parser.add_argument(
> "--inputs",
> default="inputs",
> help="Specify folder name that event records are saved",
> )
> parser.add_argument(
> "--runner", default="DirectRunner", help="Specify Apache Beam
> Runner"
> )
> opts = parser.parse_args()
> # PARENT_DIR =
> os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
>
> options = PipelineOptions()
> options.view_as(StandardOptions).runner = opts.runner
>
> lines = [
> "Lorem ipsum dolor sit amet, consectetuer adipiscing elit.
> Vestibulum erat nulla, ullamcorper nec, rutrum non, nonummy ac, erat. Nulla
> non lectus sed nisl molestie malesuada. Cras elementum. Integer in sapien.
> Mauris elementum mauris vitae tortor. Aliquam ante. Cum sociis natoque
> penatibus et magnis dis parturient montes, nascetur ridiculus mus. In enim
> a arcu imperdiet malesuada. Neque porro quisquam est, qui dolorem ipsum
> quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius
> modi tempora incidunt ut labore et dolore magnam aliquam quaerat
> voluptatem. Aliquam erat volutpat. Excepteur sint occaecat cupidatat non
> proident, sunt in culpa qui officia deserunt mollit anim id est laborum. In
> enim a arcu imperdiet malesuada. Class aptent taciti sociosqu ad litora
> torquent per conubia nostra, per inceptos hymenaeos."
> "Duis pulvinar. Integer pellentesque quam vel velit. Sed
> convallis magna eu sem. Phasellus rhoncus. Aliquam erat volutpat. Quisque
> porta. Maecenas fermentum, sem in pharetra pellentesque, velit turpis
> volutpat ante, in pharetra metus odio a lectus. Fusce suscipit libero eget
> elit. Curabitur vitae diam non enim vestibulum interdum. Nam quis nulla.
> Etiam dui sem, fermentum vitae, sagittis id, malesuada in, quam. Aliquam
> ornare wisi eu metus. Aenean vel massa quis mauris vehicula lacinia. Nam
> libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit
> quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda
> est, omnis dolor repellendus."
> ]
> # lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs"))
> now = int(datetime.datetime.now().timestamp() * 1000)
> test_stream = (
> TestStream(coder=coders.StrUtf8Coder())
> .add_elements(
> 

Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-13 Thread XQ Hu via user
Can you explain more about " that current sinks for Avro and Parquet with
the destination of GCS are not supported"?

We do have AvroIO and ParquetIO (
https://beam.apache.org/documentation/io/connectors/) in Python.

On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek  wrote:

> Hello Beam team!
>
>
>
> We’re currently onboarding customer’s infrastructure to the Google Cloud
> Platform. The decision was made that one of the technologies they will use
> is Dataflow. Let me briefly the usecase specification:
>
> They have kafka cluster where data from CDC data source is stored. The
> data in the topics is stored as Avro format. Their other requirement is
> they want to have a streaming solution reading from these Kafka topics, and
> writing to the Google Cloud Storage again in Avro. What’s more, the
> component should be written in Python, since their Data Engineers heavily
> prefer Python instead of Java.
>
>
>
> We’ve been struggling with the design of the solution for couple of weeks
> now, and we’re facing quite unfortunate situation now, not really finding
> any solution that would fit these requirements.
>
>
>
> So the question is: Is there any existing Dataflow template/solution with
> the following specifications:
>
>- Streaming connector
>- Written in Python
>- Consumes from Kafka topics
>- Reads Avro with Schema Registry
>- Writes Avro to GCS
>
>
>
> We found out, that current sinks for Avro and Parquet with the destination
> of GCS are not supported for Python at the moment, which is basically the
> main blocker now.
>
>
>
> Any recommendations/suggestions would be really highly appreciated!
>
>
>
> Maybe the solution really does not exist and we need to create our own
> custom connector for it. The question in this case would be if that’s even
> possible theoretically, since we would really need to avoid another dead
> end.
>
>
>
> Thanks a lot for any help!
>
>
>
> Kind regards,
>
> Ondrej
>


Re: How to change SQL dialect on beam_sql magic?

2024-03-08 Thread XQ Hu via user
I do not think the dialect argument is exposed here:
https://github.com/apache/beam/blob/a391198b5a632238dc4a9298e635bb5eb0f433df/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py#L293

Two options:
1) create a feature request and PR to add that
2) Switch to SqlTransform

On Mon, Mar 4, 2024 at 12:14 AM Jaehyeon Kim  wrote:

> Hello,
>
> beam_sql magic doesn't seem to have an option to specify an SQL dialect
> while the underlying SqlTransform has the dialect argument. Is there a way
> to specify an SQL dialect on a notebook?
>
> Cheers,
> Jaehyeon
>
> [image: image.png]
>


Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread XQ Hu via user
Is this what you are looking for?

import random
import time

import apache_beam as beam
from apache_beam.transforms import trigger, window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp

with beam.Pipeline() as p:
input = (
p
| PeriodicImpulse(
start_timestamp=time.time(),
stop_timestamp=time.time() + 16,
fire_interval=1,
apply_windowing=False,
)
| beam.Map(lambda x: random.random())
| beam.WindowInto(window.FixedWindows(4))
| beam.GroupBy()
| "Print Windows"
>> beam.transforms.util.LogElements(with_timestamp=True,
with_window=True)
)

On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user <
user@beam.apache.org> wrote:

> *Hello Beam Users!*
>
>
>
> I was looking into a simple example in Python to have an unbound
> (--streaming flag ) pipeline that generated random numbers , applied a
> Fixed Window (let’s say 5 seconds) and then applies a group by operation (
> reshuffle) and print the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> *#Get Parameters from Command Line for the Pipeline*
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> *#Create pipeline*
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> *#Execute Pipeline*
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values"  >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto(
> window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> |  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(),
> x) ,flush=True ) )
>
> )
>
>
>
> result = p.run()
>
> result.wait_until_finish()
>
>
>
>
>
> Even thought the  Random Generator is unbound and tagged as so with the
> decorator, it seems to stuck, if I make that step finite (i.e. adding a
> counter and exiting) then the code works in regular batch mode.
>
>
>
> #
> =
>
> # Class for Splittable Do  Random Generatered numbers
>
> #
> =
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> class RandomNumberGenerator(beam.DoFn):
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> def process(self, element ):
>
> import random
>
> import time
>
>
>
> counter=0
>
>
>
>
>
> while True:
>
>
>
> #if counter>5:
>
> #break
>
> nmb = random.randint(0, 1000)
>
> wait = random.randint(0, 5)
>
> rnow = time.time()
>
>
>
>
>
> print("Randy random", nmb)
>
>
>
> yield beam.window.TimestampedValue(nmb, rnow)
>
> time.sleep(wait)
>
> counter+=1
>
>
>
> I have tried to implement as per documentation the tracker and watermark,
> but it seems that none of that seems to work either for the *DirectRunner
> or FlinkRunner*  (even there where reshuffle is not a custom operation
> but a vertex between the different ParDos). It seems to just stuck.
>
>
>
> I event tried using the native PeriodicImpusle
> 
> as to factor out any of my implementation on it, however I still got the
> same result of it being ‘stuck’ on the GroupBy/Reshuffle operation.
>
>
>
> In the past I have created with the Java SDK a Unbound Source (now
> obsoleted it seems according to doc)   streaming pipelines, however I
> noticed that  most of the unbound python readers like Kakfa
> 
>  and PubSub
> 
> use ExternalTransforms behind the scenes so I am starting to wonder if such
> unbound sources are supported at all natively in Python.
>
>
>
> I have done some Internet search and even tried  LLMs to get to have a
> suggestion but I don’t seem to be successful in getting a clear answer on
> how to achieve this in Python or if this is even possible and after
> spending a couple days I figure I could ask the beam team and hear your
> thoughts about it and if you can reference me to any sample that might work
> so I can analyze it forward to understand what is missing would be greatly
> appreciated.
>
>
>
>
>
>
>
> Regards,
>
> *JP – A fellow Apache Beam enthusiast *
>
> --
>
> The information in this 

Re: Cross Language Runtime error python-Java

2024-02-24 Thread XQ Hu via user
Great, you figured it out. Thanks for posting this back to the list.

On Sat, Feb 24, 2024 at 5:23 PM George Dekermenjian 
wrote:

> Adding the following to both Dockerfile.launcher and Dockerfile.worker did
> the trick for me.
>
>
>
> COPY --from=apache/beam_java11_sdk:latest /opt/apache/beam/jars
> /opt/apache/beam/jars COPY --from=apache/beam_java11_sdk:latest
> /opt/java/openjdk /opt/java/openjdk ENV JAVA_HOME=/opt/java/openjdk ENV
> PATH="${JAVA_HOME}/bin:${PATH}"
>
>
>
> On Sat, Feb 24, 2024 at 21:55 XQ Hu via user  wrote:
>
>> Does your code work without the launcher? Better check this step by step
>> to figure out which part causes this error.
>>
>> On Sat, Feb 24, 2024 at 3:25 AM George Dekermenjian 
>> wrote:
>>
>>> I have a python pipeline that uses the bigquery storage write method
>>> (cross language with Java). I’m building launcher and worker docker images
>>> and then launching the flex template. The launcher fails due to the
>>> following runtime error in dataflow.
>>>
>>> I’m using runner v2 and it is a streaming pipeline using the streaming
>>> engine.
>>>
>>> Any ideas of what is causing this?
>>>
>>> RuntimeError:
>>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>>> java.lang.ClassCastException: class
>>> org.apache.beam.sdk.transforms.windowing.Repeatedly cannot be cast to class
>>> org.apache.beam.sdk.transforms.windowing.Trigger$OnceTrigger
>>> (org.apache.beam.sdk.transforms.windowing.Repeatedly and
>>> org.apache.beam.sdk.transforms.windowing.Trigger$OnceTrigger are in unnamed
>>> module of loader 'app')"
>>> line: "exec.go:66"
>>>
>>>


Re: Problem in jdbc connector with autoincrement value

2024-02-24 Thread XQ Hu via user
Here is what I did:

CREATE TABLE IF NOT EXISTS test2 (id bigint DEFAULT
nextval('sap_tm_customer_id_seq'::regclass) NOT NULL, name VARCHAR(10),
load_date_time TIMESTAMP)

make sure id cannot be NULL (you might not need this).

I tried this for my data without using the id field:

class ExampleRow(NamedTuple):
name: str
load_date_time: str

For the statement, I used this:

"INSERT INTO test2 VALUES(DEFAULT, ?,?::timestamp)"

DEFAULT fills in the id using sap_tm_customer_id_seq.

I hope this is what you are looking for.


On Mon, Feb 19, 2024 at 5:57 PM Juan Romero  wrote:

> Hi guys. I have a table in apache beam that has an auto increment id with
> a sequence.
>
>
>
>
>
>
> *CREATE SEQUENCE sap_tm_customer_id_seq;CREATE TABLE IF NOT EXISTS test (
>  id bigint DEFAULT nextval('sap_tm_customer_id_seq'::regclass),   name
> VARCHAR(10),   load_date_time TIMESTAMP);*
>
> And i have the following pipeline to make the insert into the table:
> with beam.Pipeline() as p:
>   _ = (
>   p
>   | beam.Create(
> [
>
> ExampleRow(id=1,name='adsf', load_date_time='2023-04-05
> 12:34:55'),
> ExampleRow(id=3,name='adsf', load_date_time='2023-04-05
> 12:34:56')
> ]).with_output_types(ExampleRow)
>   | 'Write to jdbc' >> WriteToJdbc(
>   driver_class_name='org.postgresql.Driver',
>   jdbc_url='jdbc:postgresql://localhost:5432/postgres',
>   username='postgres',
>   password='postgres',
>   table_name= 'test',
>   connection_properties="stringtype=unspecified",
>   statement='INSERT INTO test \
> VALUES(?,?,?) '
>   ))
>
> The problem is that I haven't been able to insert the register
> omitting the id field into the pcollection in a way the database
> automatically assigns the auto increment id.
>
> Can you help me? I have spent a lot of time but i have not realize the
> solution.
>
> Thanks!!
>
>
>
>


Re: Cross Language Runtime error python-Java

2024-02-24 Thread XQ Hu via user
Does your code work without the launcher? Better check this step by step to
figure out which part causes this error.

On Sat, Feb 24, 2024 at 3:25 AM George Dekermenjian 
wrote:

> I have a python pipeline that uses the bigquery storage write method
> (cross language with Java). I’m building launcher and worker docker images
> and then launching the flex template. The launcher fails due to the
> following runtime error in dataflow.
>
> I’m using runner v2 and it is a streaming pipeline using the streaming
> engine.
>
> Any ideas of what is causing this?
>
> RuntimeError:
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.ClassCastException: class
> org.apache.beam.sdk.transforms.windowing.Repeatedly cannot be cast to class
> org.apache.beam.sdk.transforms.windowing.Trigger$OnceTrigger
> (org.apache.beam.sdk.transforms.windowing.Repeatedly and
> org.apache.beam.sdk.transforms.windowing.Trigger$OnceTrigger are in unnamed
> module of loader 'app')"
> line: "exec.go:66"
>
>


Re: Query about `JdbcIO`

2024-02-24 Thread XQ Hu via user
I did not find BEAM-13846 but this suggests String is never supported:

https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java#L59

However, you could use the code from the test to create yours.

On Thu, Feb 22, 2024 at 11:20 AM Vardhan Thigle via user <
user@beam.apache.org> wrote:

> Hi,
> I had a small query about `JdbcIO`.
> As per the documentation
> 
>  `readWithPartitions` is supported for  Long, DateTime
> 
> , String types for the partition column but on top of the tree code, 
> `PRESET_HELPERS`
> (ref
> )
> support only Long and DateTime.
>
> Was the support for `String` rolled back? If yes could you please help me
> with the exact problem that caused the rollback (or any pointers to a
> previous Issue)?
>
> Regards and Thanks,
> Vardhan Thigle,
> +919535346204 <+91%2095353%2046204>
>
> Regards and Thanks,
> Vardhan Thigle,
> +919535346204 <+91%2095353%2046204>
>


Re: Does withkeys transform enforce a reshuffle?

2024-01-19 Thread XQ Hu via user
I do not think it enforces a reshuffle by just checking the doc here:
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=withkeys#apache_beam.transforms.util.WithKeys

Have you tried to just add ReShuffle after PubsubLiteIO?

On Thu, Jan 18, 2024 at 8:54 PM hsy...@gmail.com  wrote:

> Hey guys,
>
> I have a question, does withkeys transformation enforce a reshuffle?
>
> My pipeline basically look like this PubsubLiteIO -> ParDo(..) -> ParDo()
> -> BigqueryIO.write()
>
> The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused together.
> But The ParDo is expensive and I want dataflow to have more workers to work
> on that, what's the best way to do that?
>
> Regards,
>
>


Re: Beam 2.53.0 Release

2024-01-05 Thread XQ Hu via user
Great! And thank you!

On Fri, Jan 5, 2024 at 2:49 PM Jack McCluskey via user 
wrote:

> We are happy to present the new 2.53.0 release of Beam.
> This release includes both improvements and new functionality.
> For more information on changes in 2.53.0, check out the detailed release
> notes (https://github.com/apache/beam/milestone/17).
>
> Highlights
> * Python streaming users that use 2.47.0 and newer versions of Beam should
> update to version 2.53.0, which fixes a known issue (
> https://github.com/apache/beam/issues/27330)
>
> I/Os
> * TextIO now supports skipping multiple header lines (Java)(
> https://github.com/apache/beam/issues/17990)
> * Python GCSIO is now implemented with GCP GCS Client instead of apitools (
> https://github.com/apache/beam/issues/25676)
> * Adding support for LowCardinality DataType in ClickHouse (Java) (
> https://github.com/apache/beam/pull/29533)
> * Added support for handling bad records to KafkaIO (Java) (
> https://github.com/apache/beam/pull/29546)
> * Add support for generating text embeddings in MLTransform for Vertex AI
> and Hugging Face Hub models. (https://github.com/apache/beam/pull/29564)
> * NATS IO connector added (Go) (
> https://github.com/apache/beam/issues/29000)
>
> New Features / Improvements
> * The Python SDK now type checks `collections.abc.Collections` types
> properly. Some type hints that were erroneously allowed by the SDK may now
> fail (https://github.com/apache/beam/pull/29272)
> * Running multi-language pipelines locally no longer requires Docker.
>   Instead, the same (generally auto-started) subprocess used to perform the
>   expansion can also be used as the cross-language worker.
> * Framework for adding Error Handlers to composite transforms added in
> Java (https://github.com/apache/beam/pull/29164)
> * Python 3.11 images now include google-cloud-profiler (
> https://github.com/apache/beam/pull/29651)
>
> Breaking Changes
> * Upgraded to go 1.21.5 to build, fixing CVE-2023-45285 (
> https://security-tracker.debian.org/tracker/CVE-2023-45285) and
> CVE-2023-39326 (https://security-tracker.debian.org/tracker/CVE-2023-39326
> )
>
> Deprecations
> * Euphoria DSL is deprecated and will be removed in a future release (not
> before 2.56.0) (https://github.com/apache/beam/issues/29451)
>
> Bugfixes
> * (Python) Fixed sporadic crashes in streaming pipelines that affected
> some users of 2.47.0 and newer SDKs (
> https://github.com/apache/beam/issues/27330)
> * (Python) Fixed a bug that caused MLTransform to drop identical elements
> in the output PCollection (https://github.com/apache/beam/issues/29600)
>
> Thanks,
>
> Jack McCluskey
> --
>
>
> Jack McCluskey
> SWE - DataPLS PLAT/ Dataflow ML
> RDU
> jrmcclus...@google.com
>
>
>


Re: Removing old dataflow jobs

2024-01-02 Thread XQ Hu via user
You can archive jobs now:
https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#archive

On Tue, Jan 2, 2024 at 8:47 AM Svetak Sundhar via user 
wrote:

> Hello Sumit,
>
> There is no requirement to delete old jobs, though you can archive
> completed jobs via a recently released feature [1]. There is no limit to
> the number of old jobs that are accumulated. Further, the UI will show all
> running Dataflow jobs and all jobs run in the last 30 days [2].
>
> Hope this helps!
>
> [1]
> https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#archive
>
> [2]
> https://cloud.google.com/dataflow/docs/guides/monitoring-overview
>
>
>
> Svetak Sundhar
>
>   Data Engineer
> s vetaksund...@google.com
>
>
>
> On Tue, Jan 2, 2024 at 6:11 AM Sumit Desai via user 
> wrote:
>
>> Hi all,
>>
>> I have integrated Google Dataflow with my application. After every run,
>> there is one job created under Dataflow as shown in the screenshot.
>>
>> Do I need to delete the old jobs? Is there any restriction on the number
>> of old jobs that can get accumulated here?
>>
>> Thanks & Regards,
>> Sumit Desai
>>
>


Re: [Question] S3 Token Expiration during Read Step

2023-12-22 Thread XQ Hu via user
Can you share some code snippets about how to read from S3? Do you use the
builtin TextIO?

On Fri, Dec 22, 2023 at 11:28 AM Ramya Prasad via user 
wrote:

> Hello,
>
> I am a developer trying to use Apache Beam, and I have a nuanced problem I
> need help with. I have a pipeline which has to read in 40 million records
> from multiple Parquet files from AWS S3. The only way I can get the
> credentials I need for this particular bucket is to call an API, which I do
> before the pipeline executes, and then I store the credentials in the
> PipelineOptions for the pipeline to use during the read. However, the
> credentials are only valid for one hour, and my pipeline takes longer than
> one hour to run. So after an hour of execution, the pipeline fails with a
> credentials invalidation error. The only way I can refresh the credentials
> is by calling the API. Is there a way for me to do this in my pipeline
> while it's running?
>
> Any help would be appreciated!
>
> Thanks and sincerely,
> Ramya
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Re: [Question] WaitOn for Reading Step

2023-12-22 Thread XQ Hu via user
When I search the Beam code base, there are plenty of places which
use Wait.on. You could check these code for some insights.
If this doesn't work, it would be better to create a small test case to
reproduce the problem and open a Github issue.
Sorry, I cannot help too much with this.

On Fri, Dec 22, 2023 at 11:28 AM Ramya Prasad via user 
wrote:

> Hello,
>
> I am a developer trying to use Apache Beam, and I am running into an issue
> where my WaitOn step is not working as expected. I want my pipeline to read
> all the data from an S3 bucket using ParquetIO before moving on to the rest
> of the steps in my pipeline. However, I see in my DAG that even though
> there is a collect step after all the data is being read in, my pipeline
> still reads from S3 in the subsequent steps. It appears that the Wait.on is
> not actually happening. Is it even possible to wait on a read step? This is
> what my code looks like:
>
> PCollection records = pipeline.apply("Read parquet file in as 
> Generic Records", 
> ParquetIO.read(finalSchema).from(beamReadPath).withConfiguration(configuration));
> PCollection recordsWaited = records
> .apply("Waiting on Read Parquet File", 
> Wait.on(records)).setCoder(AvroCoder.of(GenericRecord.class, finalSchema));
> {Processing of rest of data subsequently}
>
>
>
> Any help would be greatly appreciated, thanks!
>
> Sincerely,
> Ramya
> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Re: Environmental variables not accessible in Dataflow pipeline

2023-12-22 Thread XQ Hu via user
You can use the same docker image for both template launcher and Dataflow
job. Here is one example:
https://github.com/google/dataflow-ml-starter/blob/main/tensorflow_gpu.flex.Dockerfile#L60

On Fri, Dec 22, 2023 at 8:04 AM Sumit Desai  wrote:

> Yes, I will have to try it out.
>
> Regards
> Sumit Desai
>
> On Fri, Dec 22, 2023 at 3:53 PM Sofia’s World  wrote:
>
>> I guess so, i am not an expert on using env variables in dataflow
>> pipelines as any config dependencies i  need, i pass them as job input
>> params
>>
>> But perhaps you can configure variables in your docker file (i am not an
>> expert in this either),  as  flex templates use Docker?
>>
>>
>> https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates
>>
>> hth
>>   Marco
>>
>>
>>
>>
>> On Fri, Dec 22, 2023 at 10:17 AM Sumit Desai 
>> wrote:
>>
>>> We are using an external non-public package which expects environmental
>>> variables only. If environmental variables are not found, it will throw an
>>> error. We can't change source of this package.
>>>
>>> Does this mean we will face same problem with flex templates also?
>>>
>>> On Fri, 22 Dec 2023, 3:39 pm Sofia’s World,  wrote:
>>>
 The flex template will allow you to pass input params with dynamic
 values to your data flow job so you could replace the env variable with
 that input? That is, unless you have to have env bars..but from your
 snippets it appears you are just using them to configure one of your
 components?
 Hth

 On Fri, 22 Dec 2023, 10:01 Sumit Desai, 
 wrote:

> Hi Sofia and XQ,
>
> The application is failing because I have loggers defined in every
> file and the method to create a logger tries to create an object of
> UplightTelemetry. If I use flex templated, will the environmental 
> variables
> I supply be loaded before the application gets loaded? If not, it would 
> not
> serve my purpose.
>
> Thanks & Regards,
> Sumit Desai
>
> On Thu, Dec 21, 2023 at 10:02 AM Sumit Desai 
> wrote:
>
>> Thank you HQ. Will take a look at this.
>>
>> Regards,
>> Sumit Desai
>>
>> On Wed, Dec 20, 2023 at 8:13 PM XQ Hu  wrote:
>>
>>> Dataflow VMs cannot know your local env variable. I think you should
>>> use custom container:
>>> https://cloud.google.com/dataflow/docs/guides/using-custom-containers.
>>> Here is a sample project:
>>> https://github.com/google/dataflow-ml-starter
>>>
>>> On Wed, Dec 20, 2023 at 4:57 AM Sofia’s World 
>>> wrote:
>>>
 Hello Sumit
  Thanks. Sorry...I guess if the value of the env variable is always
 the same u can pass it as job params?..though it doesn't sound like a
 viable option...
 Hth

 On Wed, 20 Dec 2023, 09:49 Sumit Desai, 
 wrote:

> Hi Sofia,
>
> Thanks for the response. For now, we have decided not to use flex
> template. Is there a way to pass environmental variables without 
> using any
> template?
>
> Thanks & Regards,
> Sumit Desai
>
> On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World 
> wrote:
>
>> Hi
>>  My 2 cents. .have u ever considered using flex templates to run
>> your pipeline? Then you can pass all your parameters at runtime..
>> (Apologies in advance if it does not cover your use case...)
>>
>> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, <
>> user@beam.apache.org> wrote:
>>
>>> Hi all,
>>>
>>> I have a Python application which is using Apache beam and
>>> Dataflow as runner. The application uses a non-public Python package
>>> 'uplight-telemetry' which is configured using 'extra_packages' while
>>> creating pipeline_options object. This package expects an 
>>> environmental
>>> variable named 'OTEL_SERVICE_NAME' and since this variable is not 
>>> present
>>> in the Dataflow worker, it is resulting in an error during 
>>> application
>>> startup.
>>>
>>> I am passing this variable using custom pipeline options. Code
>>> to create pipeline options is as follows-
>>>
>>> pipeline_options = ProcessBillRequests.CustomOptions(
>>> project=gcp_project_id,
>>> region="us-east1",
>>> job_name=job_name,
>>> 
>>> temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>>> 
>>> staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>>> runner='DataflowRunner',
>>> save_main_session=True,
>>> service_account_email= service_account,
>>> subnetwork=os.environ.get(SUBNETWORK_URL),
>>> 

Re: Environmental variables not accessible in Dataflow pipeline

2023-12-20 Thread XQ Hu via user
Dataflow VMs cannot know your local env variable. I think you should use
custom container:
https://cloud.google.com/dataflow/docs/guides/using-custom-containers. Here
is a sample project: https://github.com/google/dataflow-ml-starter

On Wed, Dec 20, 2023 at 4:57 AM Sofia’s World  wrote:

> Hello Sumit
>  Thanks. Sorry...I guess if the value of the env variable is always the
> same u can pass it as job params?..though it doesn't sound like a
> viable option...
> Hth
>
> On Wed, 20 Dec 2023, 09:49 Sumit Desai,  wrote:
>
>> Hi Sofia,
>>
>> Thanks for the response. For now, we have decided not to use flex
>> template. Is there a way to pass environmental variables without using any
>> template?
>>
>> Thanks & Regards,
>> Sumit Desai
>>
>> On Wed, Dec 20, 2023 at 3:16 PM Sofia’s World 
>> wrote:
>>
>>> Hi
>>>  My 2 cents. .have u ever considered using flex templates to run your
>>> pipeline? Then you can pass all your parameters at runtime..
>>> (Apologies in advance if it does not cover your use case...)
>>>
>>> On Wed, 20 Dec 2023, 09:35 Sumit Desai via user, 
>>> wrote:
>>>
 Hi all,

 I have a Python application which is using Apache beam and Dataflow as
 runner. The application uses a non-public Python package
 'uplight-telemetry' which is configured using 'extra_packages' while
 creating pipeline_options object. This package expects an environmental
 variable named 'OTEL_SERVICE_NAME' and since this variable is not present
 in the Dataflow worker, it is resulting in an error during application
 startup.

 I am passing this variable using custom pipeline options. Code to
 create pipeline options is as follows-

 pipeline_options = ProcessBillRequests.CustomOptions(
 project=gcp_project_id,
 region="us-east1",
 job_name=job_name,
 
 temp_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
 
 staging_location=f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
 runner='DataflowRunner',
 save_main_session=True,
 service_account_email= service_account,
 subnetwork=os.environ.get(SUBNETWORK_URL),
 extra_packages=[uplight_telemetry_tar_file_path],
 setup_file=setup_file_path,
 OTEL_SERVICE_NAME=otel_service_name,
 OTEL_RESOURCE_ATTRIBUTES=otel_resource_attributes
 # Set values for additional custom variables as needed
 )


 And the code that executes the pipeline is as follows-


 result = (
 pipeline
 | "ReadPendingRecordsFromDB" >> read_from_db
 | "Parse input PCollection" >> 
 beam.Map(ProcessBillRequests.parse_bill_data_requests)
 | "Fetch bills " >> 
 beam.ParDo(ProcessBillRequests.FetchBillInformation())
 )

 pipeline.run().wait_until_finish()

 Is there a way I can set the environmental variables in custom options
 available in the worker?

 Thanks & Regards,
 Sumit Desai

>>>


Re: Specifying dataflow template location with Apache beam Python SDK

2023-12-18 Thread XQ Hu via user
https://github.com/google/dataflow-ml-starter/tree/main?tab=readme-ov-file#run-the-beam-pipeline-with-dataflow-flex-templates
has a full example about how to create your own flex template. FYI.

On Mon, Dec 18, 2023 at 5:01 AM Bartosz Zabłocki via user <
user@beam.apache.org> wrote:

> Hi Sumit,
> could you elaborate a little bit more on what you are trying to achieve
> with the templates?
>
> As far as I know, these base Docker images serve as base images for your
> own custom templates.
> If you want to use an existing template, you can use one of these:
> https://cloud.google.com/dataflow/docs/guides/templates/provided-templates
> .
> To run it, you just need to invoke `gcloud dataflow jobs run... ` or
> equivalent command (
> https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-pubsub#gcloud).
> Or just use the UI to launch it (Cloud Console -> Dataflow -> Jobs ->
> Create Job From Template).
>
> If you want to create your own template (ie a reusable Dataflow pipeline)
> take a look at this page:
> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates#create_a_flex_template.
> This will let you package your own pipeline as a template. You'll be able
> to launch it with the `gcloud dataflow jobs run...` command.
> If you want to create a custom container image, which gives you more
> control over the environment and dependencies, you can create your own,
> custom Docker image. That's where you'll use the base image you mentioned.
> See this page for an example:
> https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates#use_a_custom_container_for_dependencies
> .
>
> I hope this helps, let me know if you have any other questions.
>
> Cheers,
> Bartosz Zablocki
>
> On Mon, Dec 18, 2023 at 8:36 AM Sumit Desai via user 
> wrote:
>
>> I am creating an Apache beam pipeline using Python SDK.I want to use some
>> standard template of dataflow (this one
>> ).
>> But when I am specifying it using 'template_location' key while creating
>> pipeline_options object, I am getting an error `FileNotFoundError: [Errno
>> 2] No such file or directory: '
>> gcr.io/dataflow-templates-base/python310-template-launcher-base'`
>> 
>>
>> I also tried to specify the complete version `
>> gcr.io/dataflow-templates-base/python310-template-launcher-base::flex_templates_base_image_release_20231127_RC00`
>> 
>> but got the same error. Can someone suggest what I might be doing wrong?
>> The code snippet to create pipeline_options is as follows-
>>
>> def __create_pipeline_options_dataflow(job_name):
>>
>>
>> # Set up the Dataflow runner options
>> gcp_project_id = os.environ.get(GCP_PROJECT_ID)
>> # TODO:Move to environmental variables
>> pipeline_options = {
>> 'project': gcp_project_id,
>> 'region': "us-east1",
>> 'job_name': job_name,  # Provide a unique job name
>> 'temp_location':
>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>> 'staging_location':
>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>> 'runner': 'DataflowRunner',
>> 'save_main_session': True,
>> 'service_account_email': service_account,
>> # 'network': f'projects/{gcp_project_id}/global/networks/default',
>> # 'subnetwork':
>> f'projects/{gcp_project_id}/regions/us-east1/subnetworks/default'
>> 'template_location': '
>> gcr.io/dataflow-templates-base/python310-template-launcher-base'
>>
>> }
>> logger.debug(f"pipeline_options created as {pipeline_options}")
>> return pipeline_options
>>
>>
>>


Re: Beam 2.52.0 Release

2023-11-18 Thread XQ Hu via user
Thanks a lot! Great job, Team!

On Fri, Nov 17, 2023 at 7:21 PM Danny McCormick via user <
user@beam.apache.org> wrote:

> I am happy to announce that the 2.52.0 release of Beam has been finalized.
> This release includes both improvements and new functionality.
>
> For more information on changes in 2.52.0, check out the detailed release
> notes - https://github.com/apache/beam/milestone/16. Here is an overview
> of the changes in the release.
>
> Highlights
>
> * Previously deprecated Avro-dependent code (Beam Release 2.46.0) has been
> finally removed from Java SDK "core" package. Please, use
> `beam-sdks-java-extensions-avro` instead. This will allow to easily update
> Avro version in user code without potential breaking changes in Beam "core"
> since the Beam Avro extension already supports the latest Avro versions and
> should handle this. (https://github.com/apache/beam/issues/25252).
> * Publishing Java 21 SDK container images now supported as part of Apache
> Beam release process. (https://github.com/apache/beam/issues/28120)
>   * Direct Runner and Dataflow Runner support running pipelines on Java21
> (experimental until tests fully setup). For other runners (Flink, Spark,
> Samza, etc) support status depend on runner projects.
>
> New Features / Improvements
>
> * Add `UseDataStreamForBatch` pipeline option to the Flink runner. When it
> is set to true, Flink runner will run batch jobs using the DataStream API.
> By default the option is set to false, so the batch jobs are still executed
> using the DataSet API.
> * `upload_graph` as one of the Experiments options for DataflowRunner is
> no longer required when the graph is larger than 10MB for Java SDK (
> https://github.com/apache/beam/pull/28621).
> * state amd side input cache has been enabled to a default of 100 MB. Use
> `--max_cache_memory_usage_mb=X` to provide cache size for the user state
> API and side inputs. (Python) (https://github.com/apache/beam/issues/28770
> ).
> * Beam YAML stable release. Beam pipelines can now be written using YAML
> and leverage the Beam YAML framework which includes a preliminary set of
> IO's and turnkey transforms. More information can be found in the YAML root
> folder and in the (
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md
> ).
>
> Breaking Changes
>
> * `org.apache.beam.sdk.io.CountingSource.CounterMark` uses custom
> `CounterMarkCoder` as a default coder since all Avro-dependent classes
> finally moved to `extensions/avro`. In case if it's still required to use
> `AvroCoder` for `CounterMark`, then, as a workaround, a copy of "old"
> `CountingSource` class should be placed into a project code and used
> directly
> (https://github.com/apache/beam/issues/25252).
> * Renamed `host` to `firestoreHost` in `FirestoreOptions` to avoid
> potential conflict of command line arguments (Java) (
> https://github.com/apache/beam/pull/29201).
>
> Bugfixes
>
> * Fixed "Desired bundle size 0 bytes must be greater than 0" in Java SDK's
> BigtableIO.BigtableSource when you have more cores than bytes to read
> (Java) (https://github.com/apache/beam/issues/28793).
> * `watch_file_pattern` arg of the RunInference arg had no effect prior to
> 2.52.0. To use the behavior of arg `watch_file_pattern` prior to 2.52.0,
> follow the documentation at
> https://beam.apache.org/documentation/ml/side-input-updates/ and use
> `WatchFilePattern` PTransform as a SideInput. (
> https://github.com/apache/beam/pulls/28948)
> * `MLTransform` doesn't output artifacts such as min, max and quantiles.
> Instead, `MLTransform` will add a feature to output these artifacts as
> human readable format - (https://github.com/apache/beam/issues/29017).
> For now, to use the artifacts such as min and max that were produced by the
> eariler `MLTransform`, use `read_artifact_location` of `MLTransform`, which
> reads artifacts that were produced earlier in a different `MLTransform` (
> https://github.com/apache/beam/pull/29016/)
> * Fixed a memory leak, which affected some long-running Python pipelines: (
> https://github.com/apache/beam/issues/28246).
>
> Security Fixes
>
> * Fixed CVE-2023-39325 - (https://www.cve.org/CVERecord?id=CVE-2023-39325)
> (Java/Python/Go) (https://github.com/apache/beam/issues/29118).
> * Mitigated CVE-2023-47248 - (
> https://nvd.nist.gov/vuln/detail/CVE-2023-47248)  (Python) (
> https://github.com/apache/beam/issues/29392).
>
> Thanks,
> Danny
>


Re: [QUESTION] Why no auto labels?

2023-10-03 Thread XQ Hu via user
That suggests the default label is created as that, which indeed causes the
duplication error.

On Tue, Oct 3, 2023 at 9:15 PM Joey Tran  wrote:

> Not sure what that suggests
>
> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user  wrote:
>
>> Looks like this is the current behaviour. If you have `t =
>> beam.Filter(identity_filter)`, `t.label` is defined as
>> `Filter(identity_filter)`.
>>
>> On Mon, Oct 2, 2023 at 9:25 AM Joey Tran 
>> wrote:
>>
>>> You don't have to specify the names if the callable you pass in is
>>> /different/ for two `beam.Map`s, but  if the callable is the same you must
>>> specify a label. For example, the below will raise an exception:
>>>
>>> ```
>>> | beam.Filter(identity_filter)
>>> | beam.Filter(identity_filter)
>>> ```
>>>
>>> Here's an example on playground that shows the error message you get
>>> [1]. I marked every line I added with a "# ++".
>>>
>>> It's a contrived example, but using a map or filter at the same pipeline
>>> level probably comes up often, at least in my inexperience. For example,
>>> you. might have a pipeline that partitions a pcoll into three different
>>> pcolls, runs some transforms on them, and then runs the same type of filter
>>> on each of them.
>>>
>>> The case that happens most often for me is using the `assert_that` [2]
>>> testing transform. In this case, I think often users will really have no
>>> need for a disambiguating label as they're often just writing unit tests
>>> that test a few different properties of their workflow.
>>>
>>> [1] https://play.beam.apache.org/?sdk=python=hIrm7jvCamW
>>> [2]
>>> https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.testing.util.html#apache_beam.testing.util.assert_that
>>>
>>> On Mon, Oct 2, 2023 at 9:08 AM Bruno Volpato via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> If I understand the question correctly, you don't have to specify those
>>>> names.
>>>>
>>>> As Reuven pointed out, it is probably a good idea so you have a stable
>>>> / deterministic graph.
>>>> But in the Python SDK, you can simply use pcollection | map_fn,
>>>> instead of pcollection | 'Map' >> map_fn.
>>>>
>>>> See an example here
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/group_with_coder.py#L100-L116
>>>>
>>>>
>>>> On Sun, Oct 1, 2023 at 9:08 PM Joey Tran 
>>>> wrote:
>>>>
>>>>> Hmm, I'm not sure what you mean by "updating pipelines in place". Can
>>>>> you elaborate?
>>>>>
>>>>> I forgot to mention my question is posed from the context of a python
>>>>> SDK user, and afaict, there doesn't seem to be an obvious way to
>>>>> autogenerate names/labels. Hearing that the java SDK supports it makes me
>>>>> wonder if the python SDK could support it as well though... (If so, I'd be
>>>>> happy to do implement it). Currently, it's fairly tedious to have to name
>>>>> every instance of a transform that you might reuse in a pipeline, e.g. 
>>>>> when
>>>>> reapplying the same Map on different pcollections.
>>>>>
>>>>> On Sun, Oct 1, 2023 at 8:12 PM Reuven Lax via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>>> Are you talking about transform names? The main reason was because
>>>>>> for runners that support updating pipelines in place, the only way to do 
>>>>>> so
>>>>>> safely is if the runner can perfectly identify which transforms in the 
>>>>>> new
>>>>>> graph match the ones in the old graph. There's no good way to auto 
>>>>>> generate
>>>>>> names that will stay stable across updates - even small changes to the
>>>>>> pipeline might change the order of nodes in the graph, which could result
>>>>>> in a corrupted update.
>>>>>>
>>>>>> However, if you don't care about update, Beam can auto generate these
>>>>>> names for you! When you call PCollection.apply (if using BeamJava), 
>>>>>> simply
>>>>>> omit the name parameter and Beam will auto generate a unique name for 
>>>>>> you.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Sat, Sep 30, 2023 at 11:54 AM Joey Tran 
>>>>>> wrote:
>>>>>>
>>>>>>> After writing a few pipelines now, I keep getting tripped up from
>>>>>>> accidentally have duplicate labels from using multiple of the same
>>>>>>> transforms without labeling them. I figure this must be a common 
>>>>>>> complaint,
>>>>>>> so I was just curious, what the rationale behind this design was? My 
>>>>>>> naive
>>>>>>> thought off the top of my head is that it'd be more user friendly to 
>>>>>>> just
>>>>>>> auto increment duplicate transforms, but I figure I must be missing
>>>>>>> something
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Joey
>>>>>>>
>>>>>>


Re: [QUESTION] Why no auto labels?

2023-10-03 Thread XQ Hu via user
Looks like this is the current behaviour. If you have `t =
beam.Filter(identity_filter)`, `t.label` is defined as
`Filter(identity_filter)`.

On Mon, Oct 2, 2023 at 9:25 AM Joey Tran  wrote:

> You don't have to specify the names if the callable you pass in is
> /different/ for two `beam.Map`s, but  if the callable is the same you must
> specify a label. For example, the below will raise an exception:
>
> ```
> | beam.Filter(identity_filter)
> | beam.Filter(identity_filter)
> ```
>
> Here's an example on playground that shows the error message you get [1].
> I marked every line I added with a "# ++".
>
> It's a contrived example, but using a map or filter at the same pipeline
> level probably comes up often, at least in my inexperience. For example,
> you. might have a pipeline that partitions a pcoll into three different
> pcolls, runs some transforms on them, and then runs the same type of filter
> on each of them.
>
> The case that happens most often for me is using the `assert_that` [2]
> testing transform. In this case, I think often users will really have no
> need for a disambiguating label as they're often just writing unit tests
> that test a few different properties of their workflow.
>
> [1] https://play.beam.apache.org/?sdk=python=hIrm7jvCamW
> [2]
> https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.testing.util.html#apache_beam.testing.util.assert_that
>
> On Mon, Oct 2, 2023 at 9:08 AM Bruno Volpato via user <
> user@beam.apache.org> wrote:
>
>> If I understand the question correctly, you don't have to specify those
>> names.
>>
>> As Reuven pointed out, it is probably a good idea so you have a stable /
>> deterministic graph.
>> But in the Python SDK, you can simply use pcollection | map_fn, instead
>> of pcollection | 'Map' >> map_fn.
>>
>> See an example here
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/group_with_coder.py#L100-L116
>>
>>
>> On Sun, Oct 1, 2023 at 9:08 PM Joey Tran 
>> wrote:
>>
>>> Hmm, I'm not sure what you mean by "updating pipelines in place". Can
>>> you elaborate?
>>>
>>> I forgot to mention my question is posed from the context of a python
>>> SDK user, and afaict, there doesn't seem to be an obvious way to
>>> autogenerate names/labels. Hearing that the java SDK supports it makes me
>>> wonder if the python SDK could support it as well though... (If so, I'd be
>>> happy to do implement it). Currently, it's fairly tedious to have to name
>>> every instance of a transform that you might reuse in a pipeline, e.g. when
>>> reapplying the same Map on different pcollections.
>>>
>>> On Sun, Oct 1, 2023 at 8:12 PM Reuven Lax via user 
>>> wrote:
>>>
 Are you talking about transform names? The main reason was because for
 runners that support updating pipelines in place, the only way to do so
 safely is if the runner can perfectly identify which transforms in the new
 graph match the ones in the old graph. There's no good way to auto generate
 names that will stay stable across updates - even small changes to the
 pipeline might change the order of nodes in the graph, which could result
 in a corrupted update.

 However, if you don't care about update, Beam can auto generate these
 names for you! When you call PCollection.apply (if using BeamJava), simply
 omit the name parameter and Beam will auto generate a unique name for you.

 Reuven

 On Sat, Sep 30, 2023 at 11:54 AM Joey Tran 
 wrote:

> After writing a few pipelines now, I keep getting tripped up from
> accidentally have duplicate labels from using multiple of the same
> transforms without labeling them. I figure this must be a common 
> complaint,
> so I was just curious, what the rationale behind this design was? My naive
> thought off the top of my head is that it'd be more user friendly to just
> auto increment duplicate transforms, but I figure I must be missing
> something
>
> Cheers,
> Joey
>



Re: Pandas 2 Timeline Estimate

2023-07-12 Thread XQ Hu via user
https://github.com/apache/beam/issues/27221#issuecomment-1603626880

This tracks the progress.

On Wed, Jul 12, 2023 at 7:37 PM Adlae D'Orazio 
wrote:

> Hello,
>
> I am currently trying to use Interactive Beam to run my pipelines through
> a Jupyter notebook, but I
> have internal packages depending on Pandas 2. When I install interactive
> beam, it overrides these dependencies. I was wondering whether there's a
> timeframe for moving Beam to Pandas 2.v2? I know it has only been out
> since late April, so it's not surprising that ya'll haven't supported it
> yet. Just curious. Thank you!
>
> Best,
> Adlae
>


Re: Tour of Beam - an interactive Apache Beam learning guide

2023-06-15 Thread XQ Hu via user
We already have a Beam Overview there.
https://beam.apache.org/get-started/tour-of-beam/ contains some good Colab
notebooks, which mainly are just for Python. I suggest we link this to
https://tour.beam.apache.org/ but move the current content under Python
Quickstart.

On Thu, Jun 15, 2023 at 10:11 AM Kerry Donny-Clark via user <
user@beam.apache.org> wrote:

> Thanks for bringing this up Austin. I noticed this as well when I tried to
> search Google for "Tour of Beam". I propose we rebrand the get started Tour
> as "An Intro to Beam", or "An Overview of Beam".  I'll put up a draft PR
> today, and update this thread so folks can comment and review.
> Kerry
>
> On Thu, Jun 15, 2023, 10:00 AM Austin Bennett  wrote:
>
>> Looks like "Tour of Beam" -->
>> https://beam.apache.org/get-started/tour-of-beam/  is distinct from -->
>> https://tour.beam.apache.org/  if these things are indeed not related,
>> should we rename one, deprecate, or otherwise sort out the
>> branding/naming-conflict?
>>
>> On Mon, Jun 12, 2023 at 3:06 PM Kerry Donny-Clark via user <
>> user@beam.apache.org> wrote:
>>
>>> Some of you may not know this, but this is my personal dream project.
>>> Soon after joining Google I thought "beam is hard to learn." I had learned
>>> Go through the excellent "A tour of Go ",
>>> and I wished there was something like that for Beam. A series of clear
>>> explanations, with runnable code, and an invitation to play around and
>>> extend the examples. I wrote up a proposal, and I've worked closely with
>>> Alex and his team to execute this vision. The Tour of Beam has delivered on
>>> this dream 110%. I wish this had been around when I was learning Beam, and
>>> I am very happy that new Beam users can get great explanations and examples
>>> in the SDK of their choice.
>>> Kerry
>>>
>>> On Mon, Jun 12, 2023 at 2:13 PM Alex Panin 
>>> wrote:
>>>
 Hi Beam community!

 We invite you to try the Tour Of Beam 
 [1] - an interactive Apache Beam learning guide.

 Please share your feedback<
 https://docs.google.com/forms/d/e/1FAIpQLSdI3yTmsCtk_neVPt0zQOPSmxDBlz3uX2AcmUpoNT6iGEwkUQ/viewform?usp=pp_url>
 [2] !

 Key features available for preview:

 * Java, Python, and Go SDK learning journeys covering such topics as
 * Introduction to Beam
 * Common and Core Transforms
 * Windowing
 * Triggers
 * IO Connectors
 * Cross-Language Transforms
 * Trying built-in examples and solving challenges
 * Tracking learning progress (for authenticated users)

 Planned/in-progress items:
 * Support for cross-language examples execution
 * Learning content contribution guide
 * PR 26964 [3] - UI
 refinement
 * PR 26861 [4] - Final
 challenge content


 Please submit feedback form<
 https://docs.google.com/forms/d/e/1FAIpQLSdI3yTmsCtk_neVPt0zQOPSmxDBlz3uX2AcmUpoNT6iGEwkUQ/viewform?usp=pp_urlt>
 [2] or built-in “Enjoying Tour Of Beam?” form to provide your feedback and
 suggestions!

 Thank you for trying out the Tour Of Beam!

 Thanks,
 Alex, on behalf of Tour Of Beam team

 [1] - https://tour.beam.apache.org
 [2] -
 https://docs.google.com/forms/d/e/1FAIpQLSdI3yTmsCtk_neVPt0zQOPSmxDBlz3uX2AcmUpoNT6iGEwkUQ/viewform?usp=pp_url
 [3] https://github.com/apache/beam/pull/26964
 [4] https://github.com/apache/beam/pull/26861




Re: [Error] Unable to submit job to Dataflow Runner V2

2023-05-27 Thread XQ Hu via user
Can you check whether your code has any options that contain any of
[disable_runner_v2, disable_prime_runner_v2,
disable_prime_streaming_engine]?

On Sat, May 27, 2023 at 5:29 AM Mário Costa via user 
wrote:

> I have a pipeline built using Apache Beam java SDK version 2.46.0 when
> submitting a job using:
>
> gcloud dataflow jobs run cloud-pubsub-group-events-to-avro-$USER-`date
> +"%Y%m%d-%H%M"` \
> --gcs-location gs://$TEMPLATES_BUCKET_NAME/images/$STAGING_DATE/
> $TEMPLATE_NAME \
> --project $PROJECT_ID --region $REGION \
> --staging-location gs://$STAGING_BUCKET_NAME/images/$STAGING_DATE/staging/
> \
> --service-account-email=$SERVICE_ACCOUNT_EMAIL \
> --worker-machine-type=n1-standard-8 \
> --additional-experiments=use_runner_v2 \
> --num-workers=2 \
> --max-workers=15 \
> --worker-region=$REGION \
> --enable-streaming-engine \
> --parameters \
> inputTopic=$INPUT_TOPIC_NAME
>
> I get the following error:
>
> The workflow could not be created because portable pipelines using Apache
> Beam 2.46.0 must use Dataflow Runner V2, but Dataflow Runner V2 is disabled
> by one or more of the following experiments [disable_runner_v2,
> disable_prime_runner_v2, disable_prime_streaming_engine]. To create the
> workflow, enable Dataflow Runner V2.
>
> Following these descriptions seem to be outdated
> https://cloud.google.com/dataflow/docs/runner-v2.
>
> Does anyone know a solution for this issue ?
> Thanks
>
> --
>
> Mario Costa
> Data Analytics Senior Software Developer
>
>
>
>    
> 
>
>
>
>
> 
>


Re: Apache beam

2023-05-06 Thread XQ Hu via user
You could create a batch pipeline that reads GCS and writes to BigQuery.
And you can use this template
https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-bigquery
.

On Sat, May 6, 2023 at 1:10 AM Utkarsh Parekh 
wrote:

> Hi,
>
> I'm writing a simple streaming beam application. The application job is
> doing following tasks:
>
> 1. Reads data from GCS bucket (project 1) and loads into Kafka topic
> 2. Reads data from Kafka topic and loads into BigQUery (project 3)
>
> Composer running in Project 1
> Data Flow running in project 2
>
> I'm using BeamRunPythonPipelineOperator  and  DataflowConfiguration
> configuration.
>
> Is this the right setup? What would be the gcp_conn_id? Any suggestions?
>
> Utkarsh
>


Re: Loosing records when using BigQuery IO Connector

2023-05-03 Thread XQ Hu via user
https://github.com/apache/beam/issues/26515 tracks this issue. The fix was
merged. Thanks a lot for reporting this issue, Binh!

On Mon, Apr 17, 2023 at 12:58 PM Binh Nguyen Van  wrote:

> Hi,
>
> I tested with streaming insert and file load, and they all worked as
> expected. But looks like storage API is the new way to go so want to test
> it too. I am using Apache Beam v2.46.0 and running it with Google Dataflow.
>
> Thanks
> -Binh
>
>
> On Mon, Apr 17, 2023 at 9:53 AM Reuven Lax via user 
> wrote:
>
>> What version of Beam are you using? There are no known data-loss bugs in
>> the connector, however if there has been a regression we would like to
>> address it with high priority.
>>
>> On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a job that uses BigQuery IO Connector to write to a BigQuery
>>> table. When I test it with a small number of records (100) it works as
>>> expected but when I tested it with a larger number of records (1), I
>>> don’t see all of the records written to the output table but only part of
>>> it. It changes from run to run but no more than 1000 records as I have seen
>>> so far.
>>>
>>> There are WARNING log entries in the job log like this
>>>
>>> by client #0 failed with error, operations will be retried.
>>> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: 
>>> ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0
>>>
>>> And one log entry like this
>>>
>>> Finalize of stream [stream-name] finished with row_count: 683
>>>
>>> If I sum all the numbers reported in the WARNING message with the one in
>>> the finalize of stream above, I get 1, which is exactly the number
>>> of input records.
>>>
>>> My pipeline uses 1 worker and it looks like this
>>>
>>> WriteResult writeResult = inputCollection.apply(
>>> "Save Events To BigQuery",
>>> BigQueryIO.write()
>>> .to(options.getTable())
>>> .withFormatFunction(TableRowMappers::toRow)
>>> .withMethod(Method.STORAGE_WRITE_API)
>>> 
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>> 
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>> .withExtendedErrorInfo());
>>>
>>> writeResult
>>> .getFailedStorageApiInserts()
>>> .apply("Log failed inserts", ParDo.of(new PrintFn<>()));
>>>
>>> There are no log entries for the failed inserts.
>>>
>>> Is there anything wrong with my pipeline code or is it a bug in BigQuery
>>> IO Connector?
>>>
>>> Thanks
>>> -Binh
>>>
>>


Re: Loosing records when using BigQuery IO Connector

2023-04-17 Thread XQ Hu via user
Does FILE_LOADS (
https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html#FILE_LOADS)
work for your case?
For STORAGE_WRITE_API, it has been actively improved. If the latest SDK
still has this issue, I highly recommend you to create a Google Cloud
support ticket.

On Mon, Apr 17, 2023 at 3:47 AM Binh Nguyen Van  wrote:

> Hi,
>
> I have a job that uses BigQuery IO Connector to write to a BigQuery table.
> When I test it with a small number of records (100) it works as expected
> but when I tested it with a larger number of records (1), I don’t see
> all of the records written to the output table but only part of it. It
> changes from run to run but no more than 1000 records as I have seen so far.
>
> There are WARNING log entries in the job log like this
>
> by client #0 failed with error, operations will be retried.
> com.google.cloud.bigquery.storage.v1.Exceptions$OffsetAlreadyExists: 
> ALREADY_EXISTS: The offset is within stream, expected offset 933, received 0
>
> And one log entry like this
>
> Finalize of stream [stream-name] finished with row_count: 683
>
> If I sum all the numbers reported in the WARNING message with the one in
> the finalize of stream above, I get 1, which is exactly the number of
> input records.
>
> My pipeline uses 1 worker and it looks like this
>
> WriteResult writeResult = inputCollection.apply(
> "Save Events To BigQuery",
> BigQueryIO.write()
> .to(options.getTable())
> .withFormatFunction(TableRowMappers::toRow)
> .withMethod(Method.STORAGE_WRITE_API)
> 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
> .withExtendedErrorInfo());
>
> writeResult
> .getFailedStorageApiInserts()
> .apply("Log failed inserts", ParDo.of(new PrintFn<>()));
>
> There are no log entries for the failed inserts.
>
> Is there anything wrong with my pipeline code or is it a bug in BigQuery
> IO Connector?
>
> Thanks
> -Binh
>