Re: [YAML] add timestamp to a bounded PCollection

2024-01-08 Thread Robert Bradshaw via dev
This does appear to be a significant missing feature. I'll try to make
sure something easier gets in by the next release. See also below.

On Mon, Jan 8, 2024 at 11:30 AM Ferran Fernández Garrido
 wrote:
>
> Hi Yarden,
>
> Since it's a bounded source you could try with Sql transformation
> grouping by the timestamp column. Here are some examples of grouping:
>
> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml
>
> However, if you want to add a timestamp column in addition to the
> original CSV records then, there are multiple ways to achieve that.
>
> 1) MapToFields:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md
> [Your timestamp column could be a callable to get the current
> timestamp on each record]
>
> 2) If you need an extra layer of transformation complexity I would
> recommend creating a custom transformation:
>
> # - type: MyCustomTransform
> # name: AddDateTimeColumn
> # config:
> # prefix: 'whatever'
>
> providers:
> - type: 'javaJar'
> config:
> jar: 'gs://path/of/the/java.jar'
> transforms:
> MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1'

Alternatively you can use PyTransform, if you're more comfortable with
that by invoking it via its fully qualified name.

pipeline:
  transforms:
...
- type: MyAssignTimestamps
  config:
  kwarg1: ...
  kwarg2: ...

providers:
  type:python
  config:
packages: ['py_py_package_identifier']
  transforms:
MyAssignTimestamps:
fully_qualified_package.module.AssignTimestampsPTransform



> Best,
> Ferran
>
> El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () 
> escribió:
> >
> > Hi all,
> > Im quite new to using beam yaml. I am working with a CSV file and want to 
> > implement some windowing logic to it.
> > Was wondering what is the right way to add timestamps to each element, 
> > assuming I have a column including a timestamp.
> >
> > I am aware of Beam Programming Guide (apache.org) part but not sure how 
> > this can be implemented and used from yaml prespective.
> >
> > Thanks
> > Yarden


Re: [Dataflow][Java][2.52.0] Upgrading to 2.52.0 Surfaces Pubsub Coder Error

2024-01-08 Thread Wiśniowski Piotr

Hi Evan,

Actually did hit similar problem few months ago and finally managed to 
solve it. My situation is a bit different as I am using Python SDK and 
DataFlow runner v2 (+streaming engine, prime), and quite a lot 
of state-full processing. And for my case it did fail with very similar 
msg but related to some state-full step.


The thing is I discovered that update pipeline in place does fail even 
when submitting exact same code to the pipeline. it seems the problem 
was that the pipeline graph must be parsed in same order that on the 
original graph. In my case I had an unordered set of steps to add them 
to pipeline resulting in the same pipeline graph, but it seems that the 
ordering of parsing does matter and it fails to update running job if 
order is different.


For my case I just sorted the steps to be added to pipeline by name and 
updating job on fly started working. So it seems that pipeline state on 
DataFlow depends somehow on the order in which steps are added to 
pipeline since some recent versions (as I do recall this was working 
correctly ~2.50?). Anyone knows if this is intended? If yes would like 
to know some explanation.


Best regards

Wiśniowski Piotr

On 15.12.2023 00:14, Evan Galpin wrote:
The pipeline in question is using Dataflow v1 Runner (Runner v2: 
Disabled) in case that's an important detail.


On Tue, Dec 12, 2023 at 4:22 PM Evan Galpin  wrote:

I've checked the source code and deployment command for cases of
setting experiments. I don't see "enable_custom_pubsub_source"
being used at all, no.  I also confirmed that it is not active on
the existing/running job.

On Tue, Dec 12, 2023 at 4:11 PM Reuven Lax via user
 wrote:

Are you setting the enable_custom_pubsub_source experiment by
any chance?

On Tue, Dec 12, 2023 at 3:24 PM Evan Galpin
 wrote:

Hi all,

When attempting to upgrade a running Dataflow pipeline
from SDK 2.51.0 to 2.52.0, an incompatibility warning is
surfaced that prevents pipeline upgrade:

The Coder or type for step .../PubsubUnboundedSource
has changed


Was there an intentional coder change introduced for
PubsubMessage in 2.52.0?  I didn't note anything in the
release notes https://beam.apache.org/blog/beam-2.52.0/
nor recent changes in
PubsubMessageWithAttributesCoder[1].  Specifically the
step uses `PubsubMessageWithAttributesCoder` via
`PubsubIO.readMessagesWithAttributes()`

Thanks!


[1]

https://github.com/apache/beam/blob/90e79ae373ab38cf4e48e9854c28aaffb0938458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java#L36


Re: [YAML] add timestamp to a bounded PCollection

2024-01-08 Thread Ferran Fernández Garrido
Hi Yarden,

Since it's a bounded source you could try with Sql transformation
grouping by the timestamp column. Here are some examples of grouping:

https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml

However, if you want to add a timestamp column in addition to the
original CSV records then, there are multiple ways to achieve that.

1) MapToFields:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/yaml_mapping.md
[Your timestamp column could be a callable to get the current
timestamp on each record]

2) If you need an extra layer of transformation complexity I would
recommend creating a custom transformation:

# - type: MyCustomTransform
# name: AddDateTimeColumn
# config:
# prefix: 'whatever'

providers:
- type: 'javaJar'
config:
jar: 'gs://path/of/the/java.jar'
transforms:
MyCustomTransform: 'beam:transform:org.apache.beam:javatransformation:v1'

Here a good example of how to do that in Java:
https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaPrefixRegistrar.java

Best,
Ferran

El lun, 8 ene 2024 a las 19:53, Yarden BenMoshe () escribió:
>
> Hi all,
> Im quite new to using beam yaml. I am working with a CSV file and want to 
> implement some windowing logic to it.
> Was wondering what is the right way to add timestamps to each element, 
> assuming I have a column including a timestamp.
>
> I am aware of Beam Programming Guide (apache.org) part but not sure how this 
> can be implemented and used from yaml prespective.
>
> Thanks
> Yarden


[YAML] add timestamp to a bounded PCollection

2024-01-08 Thread Yarden BenMoshe
Hi all,
Im quite new to using beam yaml. I am working with a CSV file and want to
implement some windowing logic to it.
Was wondering what is the right way to add timestamps to each element,
assuming I have a column including a timestamp.

I am aware of Beam Programming Guide (apache.org)

part
but not sure how this can be implemented and used from yaml prespective.

Thanks
Yarden


Beam High Priority Issue Report (51)

2024-01-08 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/29926 [Bug]: FileIO: lack of timeouts may 
cause the pipeline to get stuck indefinitely
https://github.com/apache/beam/issues/29912 [Bug]: floatValueExtractor judge 
float and double equality directly
https://github.com/apache/beam/issues/29825 [Bug]: Usage of logical types 
breaks Beam YAML Sql
https://github.com/apache/beam/issues/29413 [Bug]: Can not use Avro over 1.8.2 
with Beam 2.52.0
https://github.com/apache/beam/issues/29099 [Bug]: FnAPI Java SDK Harness 
doesn't update user counters in OnTimer callback functions
https://github.com/apache/beam/issues/29022 [Failing Test]: Python Github 
actions tests are failing due to update of pip 
https://github.com/apache/beam/issues/28760 [Bug]: EFO Kinesis IO reader 
provided by apache beam does not pick the event time for watermarking
https://github.com/apache/beam/issues/28715 [Bug]: Python WriteToBigtable get 
stuck for large jobs due to client dead lock
https://github.com/apache/beam/issues/28383 [Failing Test]: 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest.testMaxThreadMetric
https://github.com/apache/beam/issues/28339 Fix failing 
"beam_PostCommit_XVR_GoUsingJava_Dataflow" job
https://github.com/apache/beam/issues/28326 Bug: 
apache_beam.io.gcp.pubsublite.ReadFromPubSubLite not working
https://github.com/apache/beam/issues/28142 [Bug]: [Go SDK] Memory seems to be 
leaking on 2.49.0 with Dataflow
https://github.com/apache/beam/issues/27892 [Bug]: ignoreUnknownValues not 
working when using CreateDisposition.CREATE_IF_NEEDED 
https://github.com/apache/beam/issues/27648 [Bug]: Python SDFs (e.g. 
PeriodicImpulse) running in Flink and polling using tracker.defer_remainder 
have checkpoint size growing indefinitely 
https://github.com/apache/beam/issues/27616 [Bug]: Unable to use 
applyRowMutations() in bigquery IO apache beam java
https://github.com/apache/beam/issues/27486 [Bug]: Read from datastore with 
inequality filters
https://github.com/apache/beam/issues/27314 [Failing Test]: 
bigquery.StorageApiSinkCreateIfNeededIT.testCreateManyTables[1]
https://github.com/apache/beam/issues/27238 [Bug]: Window trigger has lag when 
using Kafka and GroupByKey on Dataflow Runner
https://github.com/apache/beam/issues/26911 [Bug]: UNNEST ARRAY with a nested 
ROW (described below)
https://github.com/apache/beam/issues/26343 [Bug]: 
apache_beam.io.gcp.bigquery_read_it_test.ReadAllBQTests.test_read_queries is 
flaky
https://github.com/apache/beam/issues/26329 [Bug]: BigQuerySourceBase does not 
propagate a Coder to AvroSource
https://github.com/apache/beam/issues/26041 [Bug]: Unable to create 
exactly-once Flink pipeline with stream source and file sink
https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK 
Harness ProcessBundleProgress
https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder 
will drop message id and orderingKey
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/21714 
PulsarIOTest.testReadFromSimpleTopic is very flaky
https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit 
test action 
StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21476 WriteToBigQuery Dynamic table 
destinations returns wrong tableId
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) 
failing: ParDoTest$TimestampTests/OnWindowExpirationTests