Re: Easy Multi-language via a SchemaTransform-aware Expansion Service

2022-11-23 Thread Chamikara Jayalath via dev
Hi All,

The implementation of https://s.apache.org/easy-multi-language (with the
dynamic API for Python) was merged and should be available with Beam
2.44.0: https://github.com/apache/beam/pull/23413

Thanks,
Cham

On Fri, Aug 19, 2022 at 3:35 PM Chamikara Jayalath 
wrote:

> Hi All,
>
> Thanks for the comments so far. Seems like we generally agree on this
> proposal.
>
> Please see https://github.com/apache/beam/pull/22802 for a prototype
> implementation that adds the following.
>
> * Support for dynamically discovering and registering SchemaTransforms in
> the Java expansion service.
> * Support for dynamically discovering registered SchemaTransforms from the
> Python side.
> * Support for using SchemaTransforms in Python pipelines.
>
> Feel free to add more comments to the doc and/or the PR.
>
> Thanks,
> Cham
>
>
>
>
>
>
>
> On Mon, Aug 8, 2022 at 9:34 PM Chamikara Jayalath 
> wrote:
>
>> I think the *DiscoverSchemaTransform()* RPC introduced in this proposal
>> and the ability to easily deploy/use available *SchemaTransforms* using
>> an expansion service essentially provide the tooling necessary for
>> implementing such a service. Such a service could even startup expansion
>> services to discover/list transforms available in given artifacts (for
>> example, jar files).
>>
>> Thanks,
>> Cham
>>
>> On Mon, Aug 8, 2022 at 3:48 PM Byron Ellis  wrote:
>>
>>> I like that idea, sort of like Kafka’s Schema Service but for transforms?
>>>
>>> On Mon, Aug 8, 2022 at 2:45 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
 This is a great idea. I would like to approach this from the
 perspective of making it easy to provide a catalog of well-defined
 transforms for use in expansion services from typical SDKs and also
 elsewhere (e.g. for documentation purposes, GUIs, etc.) Ideally
 everything about what a transform is (its config, documentation,
 expectations on inputs, etc.) can be specified programmatically in a
 way that's much easier to both author and consume than it is now.

 On Thu, Aug 4, 2022 at 6:51 PM Chamikara Jayalath via dev
  wrote:
 >
 > Hi All,
 >
 > I believe we can make the multi-language pipelines offering [1] much
 easier to use by updating the expansion service to be fully aware of
 SchemaTransforms. Additionally this will make it easy to
 register/discover/use transforms defined in one SDK from all other SDKs.
 Specifically we could add the following features.
 >
 > Expansion service can be used to easily initialize and expand
 transforms without need for additional code.
 > Expansion service can be used to easily discover already registered
 transforms.
 > Pipeline SDKs can generate user-friendly stub-APIs based on
 transforms registered with an expansion service, eliminating the need to
 develop language-specific wrappers.
 >
 > Please see here for my proposal:
 https://s.apache.org/easy-multi-language
 >
 > Lemme know if you have any comments/questions/suggestions :)
 >
 > Thanks,
 > Cham
 >
 > [1]
 https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines
 >

>>>


Re: Getting familiar with Go SDK

2022-11-23 Thread Robert Burke
Hello and welcome!

On Wed, Nov 23, 2022, 10:40 AM Shivam Singhal 
wrote:

> Hi folks,
>
> I have been extensively using Java SDK in my daily work, and haven't yet
> touched Go SDK.
> But I am soon(in ~1month) going to have plenty of time and I would like to
> contribute to Go SDK. To help with this, I have a few questions:
>
>1. I have cloned the beam repo. Would like to know which particular
>code files should I start to read so that I can build familiarity?
>
> What use cases are you looking to extend or add?

There's lots to do, but the best place to start is what *you* would like
the SDK to work well with. Is there a usecase you'd like to enable? A
specific pipeline you'd like to improve?

Contributions for the sake of contributions don't turn out as well as
having "I need this to do X so Y works!" as a motivation in my experience.

Otherwise, so you aren't left hanging, I refer you to some of the talks
I've given on getting started with navigating around the SDK.

https://youtu.be/WcuS8ojHfyU

It's a bit older now, but still accurate at the scale it's talking about.

Beam SDKs are complex, but the Go SDK is pretty easy to navigate due to
being Go. My recommendation is just to follow the code call  stack (what
does it do when beam.ParDo is called? How does the execution harness work,
in worker containers, in harness/harness.go) and see where that takes you.

Just be aware, ultimately, we do not have the resources to hold your hand
and teach you both beam and Go. Not saying if that was your intent, but we
do have jobs to do, and our own things to get done. :)

That said, I will seek to maintain good Go style through the code review
process, and we'll avoid Beam correctness issues through the reviews, so
its not that hard a line.

>
>1. Is there a roadmap for Go SDK I can look at? Is this Go SDK being
>used in production somewhere?
>
> We tried to keep one up to date, but ultimately stopped. At this point,
the roadmap is "what do you want working?"

The best place to start is to look at the GitHub issues labeled "go".
https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+label%3Ago

If it's unassigned, it's fair game to '.take-issue'. Feel free to ping me
(@lostluck) if you have any questions about it.

You can see a rough view of the SDKs capabilities and direction in the
State of the Go SDK 2022 talk: https://youtu.be/e3yx4SwHJK0

Please let me know if there is a better place to ask.
>

This is the right place to ask!

>
> Thanks,
> Shivam
>

Robert Burke
Beam Go Busybody


Getting familiar with Go SDK

2022-11-23 Thread Shivam Singhal
Hi folks,

I have been extensively using Java SDK in my daily work, and haven't yet
touched Go SDK.
But I am soon(in ~1month) going to have plenty of time and I would like to
contribute to Go SDK. To help with this, I have a few questions:

   1. I have cloned the beam repo. Would like to know which particular code
   files should I start to read so that I can build familiarity?
   2. Is there a roadmap for Go SDK I can look at? Is this Go SDK being
   used in production somewhere?

Please let me know if there is a better place to ask.

Thanks,
Shivam


Re: Beam, Flink state and Avro Schema Evolution is problematic

2022-11-23 Thread Alexey Romanenko
+ dev

Many thanks for sharing your observations and findings on this topic, Cristian!
I copy it to dev@ as well to attract more attention to this problem.

—
Alexey


> On 18 Nov 2022, at 18:21, Cristian Constantinescu  wrote:
> 
> Hi everyone,
> 
> I'm using Beam on Flink with Avro generated records. If the record
> schema changes, the Flink state cannot be restored. I just want to
> send this email out for anyone who may need this info in the future
> and also ask others for possible solutions as this problem is so
> easily hit, that I'm having a hard time figuring out what other users
> of Beam running on the Flink runner are doing to circumvent it.
> 
> The in-depth discussion of the issue can be found here [1] (thanks
> Maximilian). There are also a few more emails about this here [2], and
> here [3].
> 
> The gist of the issue is that Beam serializes the coders used into the
> Flink state, and some of those coders hold references to the
> Bean/Pojos/Java classes they serialize/deserialize to. Flink
> serializes its state using Java serialization, that means that in the
> Flink state we will get a reference to the Bean/Pojo/Java class name
> and the related serialVersionUID. When the pojo (Avro generated)
> changes, so does its serialVersionUID, and Flink cannot deserialize
> the Beam state anymore because the serialVersionUID doesn't match, not
> on the Coder, but on the Pojo type that coder was holding when it got
> serialized.
> 
> I decided to try each coder capable of handling Pojos, one by one, to
> see if any would work. That is, I tried the SerializableCoder,
> AvroCoder and the SchemaCoder/RowCoder. In the case of AvroCoder and
> SerializableCoder, I have used the SpecificRecord version (not the
> GenericRecord one) and the non-Row (ie: the one that returns a Pojo
> type, not Row type) version respectively. They all failed the below
> test (added it to be very explicit, but really, it's just simple
> schema evolution).
> 
> Test:
> 1. Create a avro pojo (idl generated pojo):
> record FooRecord {
> union {null, string} dummy1 = null;
> }
> 2. Create a pipeline with a simple stateful DoFn, set desired coder
> for FooRecord (I tried the SerializableCoder, AvroCoder and the
> SchemaCoder/RowCoder), and populate state with a few FooRecord
> objects.
> 3. Start the pipeline
> 4. Stop the pipeline with a savepoint.
> 5. Augment FooRecord to add another field after dummy1.
> 6. Start the pipeline restoring from the saved savepoint.
> 7. Observed this exception when deserializing the savepoint -->
> "Caused by: java.io.InvalidClassException: com.mymodels.FooRecord;
> local class incompatible: stream classdesc serialVersionUID =  number>, local class serialVersionUID = "
> 
> There are a few workarounds.
> 
> Workaround A:
> Right now my working solution is to implement what was suggested by
> Pavel (thanks Pavel) in [3]. Quote from him "having my business
> logic-related POJOs still Avro-generated, but I introduced another,
> generic one, which just stores schema & payload bytes, and does not
> need to change. then using a DelegateCoder that converts the POJO
> to/from that generic schema-bytes pojo that never changes".
> 
> Basically something like this (pseudocode):
> record FlinkStateValue {
> string schema;
> bytes value;
> }
> 
> var delegateCoder = DelegateCoder.of(
> AvroCoder.of(FlinkStateValue.class),
> (FooRecord in) ->
> FlinkStateValue.setSchema(FooRecord.getSchema()).setValue(AvroCoder.of(FooRecord.class).encode(in)),
> (FlinkStateValue in) -> return
> AvroCoder.of(FooRecord.class).decode(in.getValue())
> ) ;
> 
> p.getCoderRegistry().registerCoderForClass(FooRecord.class, delegateCoder)
> 
> The downside is that now there's yet another deserialization step,
> which wastes CPU cycles. The upside is that things are decoupled, that
> is, I think the DelegateCoder could use a RowCoder.of(FooRecord)
> instead of the AvroCoder.of(FooRecord), or any other coder for that
> matter and you can change between them with only a code change.
> 
> Workaround B:
> Difficulty hard! Use the Flink state api [4] and update the Beam
> serialized state to modify the FooRecord serialVersionUID stored in
> that state to the new one after the schema evolution, then save the
> state and start your pipeline with the evolved FooRecord.
> 
> Workaround C:
> Wrap the Avro generated FooRecord to a real Pojo or AutoValue or
> anything that you have full control over serialVersionUID, and use
> that in your pipeline especially when putting things into the state.
> 
> Problem arises when the Avro generated records have lots of properties
> and or nested records. It becomes tedious to essentially duplicate
> them to Pojo/AutoValue.
> 
> Conclusion:
> I want to end by asking advice from the community. For those of you
> who use Beam with Avro records running on the Flink runner, how do you
> handle state when the Avro schema inevitably evolves?
> 
> It just seems like it's such a simple use case and such an easy
> pittrap

Beam High Priority Issue Report (56)

2022-11-23 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/24267 [Bug]: Timeout waiting to lock 
kotlin-dsl
https://github.com/apache/beam/issues/24263 [Bug]: Remote call on 
apache-beam-jenkins-3 failed. The channel is closing down or has closed down
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23745 [Bug]: Samza 
AsyncDoFnRunnerTest.testSimplePipeline is flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/22321 
PortableRunnerTestWithExternalEnv.test_pardo_large_input is regularly failing 
on jenkins
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21561 
ExternalPythonTransformTest.trivialPythonTransform flaky
https://github.com/apache/beam/issues/21480 flake: 
FlinkRunnerTest.testEnsureStdoutStdErrIsRestored
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21462 Flake in 
org.apache.beam.sdk.io.mqtt.MqttIOTest.testReadObject: Address already in use
https://github.com/apache/beam/issues/21261 
org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer
 is flaky
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121 
apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it
 flakey
https://github.com/apache/beam/issues/21113 
testTwoTimersSettingEachOtherWithCreateAsInputBounded flaky
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20975 
org.apache.beam.runners.flink.ReadSourcePortableTest.testExecution[streaming: 
false] is flaky
https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with 
grpc.FutureTimeoutError on SDK harness startup
https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM 
on Flink
https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit 
empty pane when it should
https://github.com/apache/beam/issues/19814 Flink streaming flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
https://github.com/apache/beam/issues/19734 
WatchTest.testMultiplePollsWithManyResults flake: Outputs must be in timestamp 
order (sickbayed)
https://github.com/apache/beam/issues/19465 Explore possibilities to lower 
in-use IP address quota footprint.
https://github.com/apache/beam/issues/19241 Python Dataflow integration tests 
should export the pipeline Job ID and console output to Jenkins Test Result 
section


P1 Issues with no update in the last week:

https://github.com/apache/beam/issues/23906 [Bug]: Dataflow jpms tests fail on 
the 2.43.0 release branch
https://github.com/apache/beam/issues/23875 [Bug]: beam.Row.__eq__ returns true 
for unequal rows
https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder 
will drop message id and orderingKey
https://github.com/apache/beam/issues/23489 [Bug]: add DebeziumIO to the 
connectors page
https://github.com/apache/beam/issues/23306 [Bug]: BigQueryBatchFileLoads in 
python loses data when using WRITE_TRUNCATE
https://github.com/apache/beam/issues/23286 [Bug]: 
beam_PerformanceTests_InfluxDbIO_IT Flaky > 50 % Fail 
https://github.com/apache/beam/issues/22891 [Bug]: 
beam_PostCommit_XVR_PythonUsingJavaDataflow is flaky
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/22115 [Bug]: 
apache_beam.runners.portability.portable_runner_test.PortableRun

SqlTransform translation deficiencies

2022-11-23 Thread Moritz Mack
Hi all,

Not sure who’s best to ping. I spend some time looking into the SqlTransform 
translation of one of the TPC-DS queries yesterday and noticed it’s generating 
an overly complex transform hierarchy. I’ve summarized my findings in [1]. It 
would be great to get some more experienced eyes on it, am not very familiar 
with the Calcite planner.

Thanks,
Moritz


[1] https://github.com/apache/beam/issues/24314


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice.