Re: [Question] Apache Beam Pipeline AWS Credentials

2024-01-10 Thread Alexey Romanenko
Hi Ramya,

I don’t think there is a solution out-of-the-box for such case but I believe 
you can create your own AwsCredentialsProvider and implement the logic to fetch 
the credentials dynamically.

PS: Please, use only user@beam.apache.org  mailing 
list for such type of questions.

—
Alexey

> On 9 Jan 2024, at 19:39, Ramya Prasad via user  wrote:
> 
> Hello,
> 
> I am a developer trying to use Apache Beam in Java, and I am having an issue 
> with my AWS credentials expiring before my pipeline is done executing. I'm 
> limited by how I get my AWS credentials, as I set the credentials using the 
> StaticCredentialsProvider class. I set the credentials (which is an access 
> key, secret access key, and session token) in the PipelineOptions object 
> before I create the pipeline. However, I believe the PipelineOptions object 
> cannot be modified during runtime. I'm not sure how to refresh my credentials 
> so the pipeline doesn't break during execution.
> 
> Some code for how I set my credentials for reference:
> Credentials awsCreds = ;
> AwsCredentials credentials = 
> AwsSessionCredentials.create(awsCreds.getAccessKeyId(), 
> awsCreds.getSecretAccessKey(), awsCreds.getSessionToken());
> AwsCredentialsProvider provider = 
> StaticCredentialsProvider.create(credentials);
> pipelineOptions.as(AwsOptions.class).setAwsCredentialsProvider(provider);
> 
> 
> Any help would be appreciated!
> 
> Thanks and sincerely,
> Ramya
> 
> 
> The information contained in this e-mail may be confidential and/or 
> proprietary to Capital One and/or its affiliates and may only be used solely 
> in performance of work or services for Capital One. The information 
> transmitted herewith is intended only for use by the individual or entity to 
> which it is addressed. If the reader of this message is not the intended 
> recipient, you are hereby notified that any review, retransmission, 
> dissemination, distribution, copying or other use of, or taking of any action 
> in reliance upon this information is strictly prohibited. If you have 
> received this communication in error, please contact the sender and delete 
> the material from your computer.
> 
> 



Re: Why do we need Job Server?

2023-12-05 Thread Alexey Romanenko
Oh, interesting. I didn’t know about that possibility, thanks

—
Alexey

> On 4 Dec 2023, at 18:14, Robert Bradshaw via user  
> wrote:
> 
> Note that this shouldn't be strictly necessary, e.g. for Python one can embed 
> the pipeline definition into the jar itself which is then just uploaded as an 
> "ordinary" flink executable jar to the Flink master: 
> https://github.com/apache/beam/blob/release-2.52.0/sdks/python/apache_beam/runners/portability/abstract_job_service.py#L301
>  If Java doens't do this yet we should probably update it to do so. 
> 
> On Mon, Dec 4, 2023 at 7:10 AM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
>> There are two modes to run a job with FlinkRunner - Portable and Classic. If 
>> you run a job server in Portable mode then you meed to start a JobService, 
>> configured with your Flink cluster, and submit your job through this. If you 
>> run a job in Classical mode (only Java SDK pipeline) then you don’t need it.
>> 
>> More information on this is here:
>> Apache Flink Runner
>> beam.apache.org
>> 
>>  <https://beam.apache.org/documentation/runners/flink/>Apache Flink Runner 
>> <https://beam.apache.org/documentation/runners/flink/>
>> beam.apache.org <https://beam.apache.org/documentation/runners/flink/>   
>>  <https://beam.apache.org/documentation/runners/flink/>
>> 
>> —
>> Alexey
>>  
>> 
>>> On 4 Dec 2023, at 07:53, Поротиков Станислав Вячеславович via user 
>>> mailto:user@beam.apache.org>> wrote:
>>> 
>>> Hello!
>>> I want to know which cases could lead me to use separate job server for 
>>> submutting jobs to Flink Cluster?
>>> Which cases we don't need it at all?
>>>  
>>>  
>>> Best regards,
>>> Stanislav Porotikov
>> 



Re: Control who can submit Beam jobs

2023-11-30 Thread Alexey Romanenko
No since Beam is not a runtime. In the end, it will create a Flink job and run 
it on a Flink cluster. So, it should be a responsibility of your Flink cluster.

—
Alexey

> On 30 Nov 2023, at 10:14, Поротиков Станислав Вячеславович via user 
>  wrote:
> 
> Hello!
> Is there any way to control who can submit jobs to Flink cluster. We have 
> multiple teams and I am looking for decision how can we use Beam+Flink safely.
>  
> Best regards,
> Stanislav Porotikov



Re: [Question] Apache Beam Spark Runner Support - Spark 3.5 Environment

2023-11-09 Thread Alexey Romanenko
I already added Spark 3.5.0 version to Beam Spark version tests [1] and I 
didn’t notice any regression. 

The next Beam release (2.53.0) should be available in a couple on months, 
depending on release preparation process.

—
Alexey

[1] https://github.com/apache/beam/pull/29327

> On 9 Nov 2023, at 06:37, Giridhar Addepalli  wrote:
> 
> Thank you Alexey for sharing the details.
> 
> Can you please let us know if you are planning to add Spark 3.5.0 
> compatibility test as part of Beam 2.53.0 or not.
> If so, approximately what is the timeline we are looking at for Beam 2.53.0 
> release.
> https://github.com/apache/beam/milestone/17
> 
> Thanks,
> Giridhar.
> 
> On Tue, Nov 7, 2023 at 6:24 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
>> Hi Giridhar,
>> 
>>> On 4 Nov 2023, at 08:04, Giridhar Addepalli >> <mailto:giridhar1...@gmail.com>> wrote:
>>> 
>>>  Thank you Alexey for the response.
>>> 
>>> We are using Beam 2.41.0 with Spark 3.3.0 cluster. 
>>> We did not run into any issues. 
>>> is it because in Beam 2.41.0, compatibility tests were run against spark 
>>> 3.3.0 ?
>>> https://github.com/apache/beam/blob/release-2.41.0/runners/spark/3/build.gradle
>> 
>> Correct. 
>> There are some incompatibilities between Spark 3.1/3.2/3.3 versions and we 
>> fixed this for Spark runner in Beam 2.41 to make it possible to compile and 
>> run with different Spark versions. That was a goal of these compatibility 
>> tests.
>> 
>> <22157.png>
>> Fixes #22156: Fix Spark3 runner to compile against Spark 3.2/3.3 by mosche · 
>> Pull Request #22157 · apache/beam
>> github.com
>>  <https://github.com/apache/beam/pull/22157>Fixes #22156: Fix Spark3 runner 
>> to compile against Spark 3.2/3.3 by mosche · Pull Request #22157 · 
>> apache/beam <https://github.com/apache/beam/pull/22157>
>> github.com <https://github.com/apache/beam/pull/22157>
>> 
>>> If so, since compatibility tests were not run against Spark 3.5.0 even in 
>>> latest release of Beam 2.52.0, is it not advised to use Beam 2.52.0 with 
>>> Spark 3.5.0 cluster ?
>> 
>> I’d say, for now it's up to user to test and run it since it was not tested 
>> on Beam CI. 
>> I’m going to add this version for future testing.
>> 
>> —
>> Alexey
>> 
>>> 
>>> Thanks,
>>> Giridhar.
>>> 
>>> On 2023/11/03 13:05:45 Alexey Romanenko wrote:
>>> > AFAICT, the latest tested (compatibility tests) version for now is 3.4.1 
>>> > [1] We may try to add 3.5.x version there.
>>> >  
>>> > I believe that ValidateRunners tests are run only against default Spark 
>>> > 3.2.2 version.
>>> >
>>> > —
>>> > Alexey
>>> >
>>> > [1] 
>>> > https://github.com/apache/beam/blob/2aaf09c0eb6928390d861ba228447338b8ca92d3/runners/spark/3/build.gradle#L36
>>> >
>>> >
>>> > > On 3 Nov 2023, at 05:06, Sri Ganesh Venkataraman >> > > <mailto:sr...@gmail.com>> wrote:
>>> > >
>>> > > Does Apache Beam version (2.41.0)  or latest (2.51.0) support Spark 3.5 
>>> > > environment for spark runner ?
>>> > >
>>> > > Apache Beam - Spark Runner Documentation states -
>>> > > The Spark runner currently supports Spark’s 3.2.x branch
>>> > >
>>> > > Thanks
>>> > > Sri Ganesh V
>>> >
>>> > 
>> 



Re: Count(distinct) not working in beam sql

2023-11-03 Thread Alexey Romanenko
Unfortunatelly, Beam SQL doesn’t support COUNT(DISTINCT) aggregation. 

More details about “why" is on this discussion [1] and the related open issue 
for that here [2].

—
Alexey

[1] https://lists.apache.org/thread/hvmy6d5dls3m8xcnf74hfmy1xxfgj2xh
[2] https://github.com/apache/beam/issues/19398


> On 2 Nov 2023, at 20:52, Goutham Miryala  
> wrote:
> 
> Hey Team,
> 
> We're trying to implement an aggregation which involves several trillions of 
> rows using apache beam sql.
> However I'm getting an exception 
> Exception in thread "main" java.lang.UnsupportedOperationException: Does not 
> support COUNT DISTINCT
> 
> Here's the code for doing the aggregation:
> 
> PCollection aggregate = joinedCollection.apply("Aggregation",
> SqlTransform.query("SELECT" +
> "exchange_name as adexchange," +
> "strategy," +
> "platform," +
> "segment," +
> "auction_type," +
> "placement_type," +
> "country," +
> "COALESCE(loss, 0) AS loss_code," +
> "COUNT(DISTINCT identifier) AS uniques," +
> "no_bid_reason," +
> "SUM(1) AS auctions," +
> "SUM(CASE WHEN cpm_bid > 0 THEN 1 ELSE 0 END) AS 
> bids," +
> "SUM(cpm_bid) AS total_bid_price," +
> "SUM(CASE WHEN loss = 0 THEN 1 END) AS wins," +
> "app_bundle AS app_bundle," +
> "model_id AS model_id," +
> "identifier_type AS identifier_type," +
> "promotion_id AS promotion_id," +
> "sub_floor_bid_min_price_cohort AS 
> sub_floor_bid_min_price_cohort," +
> "bf_match_experiment AS bf_match_experiment," +
> "bep_matched_floor AS bep_matched_floor," +
> "SUM(p_ctr) AS p_ctr_total," +
> "SUM(p_ir) AS p_ir_total," +
> "SUM(p_cpa) AS p_cpa_total," +
> "SUM(arppu) AS arppu_total," +
> "SUM(spend) AS spend_total," +
> "SUM(cpm_price) AS cpm_price_total" +
> "FROM" +
> "PCOLLECTION" +
> "GROUP BY 
> exchange_name,strategy,platform,segment,auction_type" +
> ",placement_type,country,loss,no_bid_reason,app_bundle" +
> 
> ",model_id,identifier_type,promotion_id,sub_floor_bid_min_price_cohort" +
> ",bf_match_experiment,bep_matched_floor")
> );
> 
> Can you please guide us?
> 
> Let me know in case you need any more information.
> 
> Goutham Miryala
> Senior Data Engineer
> 
>  



Re: [Question] Apache Beam Spark Runner Support - Spark 3.5 Environment

2023-11-03 Thread Alexey Romanenko
AFAICT, the latest tested (compatibility tests) version for now is 3.4.1 [1] We 
may try to add 3.5.x version there.
 
I believe that ValidateRunners tests are run only against default Spark 3.2.2 
version.

—
Alexey

[1] 
https://github.com/apache/beam/blob/2aaf09c0eb6928390d861ba228447338b8ca92d3/runners/spark/3/build.gradle#L36


> On 3 Nov 2023, at 05:06, Sri Ganesh Venkataraman 
>  wrote:
> 
> Does Apache Beam version (2.41.0)  or latest (2.51.0) support Spark 3.5 
> environment for spark runner ?
> 
> Apache Beam - Spark Runner Documentation states -
> The Spark runner currently supports Spark’s 3.2.x branch
> 
> Thanks
> Sri Ganesh V



Re: [DISCUSS] Drop Euphoria extension

2023-10-16 Thread Alexey Romanenko
Can we just deprecate it for a while and then remove completely?

—
Alexey

> On 13 Oct 2023, at 18:59, Jan Lukavský  wrote:
> 
> Hi,
> 
> it has been some time since Euphoria extension [1] has been adopted by Beam 
> as a possible "Java 8 API". Beam has evolved from that time a lot, the 
> current API seems actually more elegant than the original Euphoria's and last 
> but not least, it has no maintainers and no known users. If there are any 
> users, please speak up!
> 
> Otherwise I'd like to propose to drop it from codebase, I'll start a vote 
> thread during next week, if there are no objections.
> 
> Best,
> 
>  Jan
> 
> [1] https://beam.apache.org/documentation/sdks/java/euphoria/
> 



Re: Cannot find a matching Beam FieldType for Calcite type: REAL

2023-10-13 Thread Alexey Romanenko
Seems like Calcite decided to use REAL for Float values in SQL transform, while 
Beam SQL (iinm) doesn’t have a conversion from Sql.REAL to any type of Beam 
schema field.

A workaround could be to add such conversion (REAL -> FLOAT) into 
CalciteUtils.java

—
Alexey

> On 12 Oct 2023, at 20:19, Balogh, György  wrote:
> 
> Hi,
> I'm using beam 2.51.0
> I'm trying to use UDF to transform float arrays and got the following error:
> 
> Exception in thread "main" java.lang.IllegalArgumentException: Cannot find a 
> matching Beam FieldType for Calcite type: REAL
> at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toFieldType(CalciteUtils.java:280)
> at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:253)
> at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:249)
> at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at 
> java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
> at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:174)
> at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:182)
> at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:154)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
> at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:107)
> at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:56)
> at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:169)
> at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352)
> at com.ultinous.uquery.query.sandbox.Ops3.example6(Ops3.java:220)
> at com.ultinous.uquery.query.sandbox.Ops3.main(Ops3.java:243)
> 
> 
> This is my code:
> 
> public static class TestUDF implements SerializableFunction, 
> Float> {
> @Override
> public Float apply(List fv) {
> float sum = 0;
> if(fv != null)
> for (Float a : fv)
> sum += a;
> return sum;
> }
> }
> 
> public static void example6() {
> System.out.println("Example 6");
> 
> Schema rowSchema = Schema.builder()
> .addField("ind", Schema.FieldType.INT32)
> .addField("fv", Schema.FieldType.array(Schema.FieldType.FLOAT))
> .build();
> 
> Pipeline p = createPipeline();
> p
> .apply(org.apache.beam.sdk.transforms.Create.of(1, 2, 3, 4, 5))
> .apply(ParDo.of(new DoFn() {
> @ProcessElement
> public void processElement(@Element Integer ind, 
> OutputReceiver out) {
> Row.Builder rowBuilder = Row.withSchema(rowSchema);
> List fv = new ArrayList();
> fv.add(1f * ind);
> fv.add(2f * ind);
> Row row = rowBuilder
> .addValue(ind)
> .addValue(fv)
> .build();
> out.output(row);
> }
> }))
> .setRowSchema(rowSchema)
> .apply(
> SqlTransform.query("select fv, testUDF(fv) from 
> PCOLLECTION")
> .registerUdf("testUDF", new TestUDF())
> )
> .apply(new Print());
> p.run().waitUntilFinish();
> }
> 
> 
> -- 
> 
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com 



Re: [Question] Does SnowflakeIO Connector in Java support Flink?

2023-10-13 Thread Alexey Romanenko
The rule of thumb in Beam says that all IO connectors (and other transforms) 
are supported by all runners written with the same SDK by default (thanks to 
Beam model).

Since both SnowflakeIO and FlinkRunner are written natively in Java, so answer 
to your question is YES, SnowflakeIO should be supported by FlinkRunner. Just 
build your project with FlinkRunner dependency and specify this runner while 
running your pipeline (see “Classic(Java)” examples here [1]).

Another option, in case if you need to use an IO connector of one SDK with a 
pipeline written in another SDK, is a cross-language pipeline. In this case, 
both IO connector and runner have to be portable and then it may be run using 
Beam Portability framework (for example, Python pipeline with Java IO 
connectors). Though, you don’t need that for your use case (SnowflakeIO with 
FlinkRunner) if you use only Java SDK for your Beam pipeline.

You can find more details on supported IO connectors and SDKs here [2]. 

—
Alexey

[1] https://beam.apache.org/documentation/runners/flink/
[2] https://beam.apache.org/documentation/io/connectors/


> On 13 Oct 2023, at 03:40, mybeam  wrote:
> 
> Hello,
> 
> As per the document on 
> https://beam.apache.org/documentation/io/built-in/snowflake/#running-main-command-with-pipeline-options,
>  I can only see DirectRunner and DataflowRunner, and I have tested the 
> DirectRunner which is working fine. Just wonder if it supports Flink 
> officially. Any comments are welcomed. Thanks.
> 
> 



Re: "Decorator" pattern for PTramsforms

2023-09-18 Thread Alexey Romanenko
t;> was more illustrative than the actual use case.
>>>>>> 
>>>>>> My actual use case is basically: I have multiple PTransforms, and let's 
>>>>>> say most of them average ~100 generated outputs for a single input. Most 
>>>>>> of these PTransforms will occasionally run into an input though that 
>>>>>> might output maybe 1M outputs. This can cause issues if for example 
>>>>>> there are transforms that follow it that require a lot of compute per 
>>>>>> input. 
>>>>>> 
>>>>>> The simplest way to deal with this is to modify the `DoFn`s in our 
>>>>>> Ptransforms and add a limiter in the logic (e.g. `if 
>>>>>> num_outputs_generated >= OUTPUTS_PER_INPUT_LIMIT: return`). We could 
>>>>>> duplicate this logic across our transforms, but it'd be much cleaner if 
>>>>>> we could lift up this limiting logic out of the application logic and 
>>>>>> have some generic wrapper that extends our transforms.
>>>>>> 
>>>>>> Thanks for the discussion!
>>>>>> 
>>>>>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko 
>>>>>> mailto:aromanenko@gmail.com>> wrote:
>>>>>>> I don’t think it’s possible to extend in a way that you are asking 
>>>>>>> (like, Java classes “extend"). Though, you can create your own 
>>>>>>> composite PTransform that will incorporate one or several others inside 
>>>>>>> “expand()” method. Actually, most of the Beam native PTransforms are 
>>>>>>> composite transforms. Please, take a look on doc and examples [1]
>>>>>>> 
>>>>>>> Regarding your example, please, be aware that all PTransforms are 
>>>>>>> supposed to be executed in distributed environment and the order of 
>>>>>>> records is not guaranteed. So, limiting the whole output by fixed 
>>>>>>> number of records can be challenging - you’d need to make sure that it 
>>>>>>> will be processed on only one worker, that means that you’d need to 
>>>>>>> shuffle all your records by the same key and probably sort the records 
>>>>>>> in way that you need.
>>>>>>> 
>>>>>>> Did you consider to use “org.apache.beam.sdk.transforms.Top” for that? 
>>>>>>> [2]
>>>>>>> 
>>>>>>> If it doesn’t work for you, could you provide more details of your use 
>>>>>>> case? Then we probably can propose the more suitable solutions for that.
>>>>>>> 
>>>>>>> [1] 
>>>>>>> https://beam.apache.org/documentation/programming-guide/#composite-transforms
>>>>>>> [2] 
>>>>>>> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html
>>>>>>> 
>>>>>>> —
>>>>>>> Alexey
>>>>>>> 
>>>>>>>> On 15 Sep 2023, at 14:22, Joey Tran >>>>>>> <mailto:joey.t...@schrodinger.com>> wrote:
>>>>>>>> 
>>>>>>>> Is there a way to extend already defined PTransforms? My question is 
>>>>>>>> probably better illustrated with an example. Let's say I have a 
>>>>>>>> PTransform that generates a very variable number of outputs. I'd like 
>>>>>>>> to "wrap" that PTransform such that if it ever creates more than say 
>>>>>>>> 1,000 outputs, then I just take the first 1,000 outputs without 
>>>>>>>> generating the rest of the outputs.
>>>>>>>> 
>>>>>>>> It'd be trivial if I have access to the DoFn, but what if the 
>>>>>>>> PTransform in question doesn't expose the `DoFn`?
>>>>>>> 



Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Alexey Romanenko
I don’t think it’s possible to extend in a way that you are asking (like, Java 
classes “extend"). Though, you can create your own composite PTransform that 
will incorporate one or several others inside “expand()” method. Actually, most 
of the Beam native PTransforms are composite transforms. Please, take a look on 
doc and examples [1]

Regarding your example, please, be aware that all PTransforms are supposed to 
be executed in distributed environment and the order of records is not 
guaranteed. So, limiting the whole output by fixed number of records can be 
challenging - you’d need to make sure that it will be processed on only one 
worker, that means that you’d need to shuffle all your records by the same key 
and probably sort the records in way that you need.

Did you consider to use “org.apache.beam.sdk.transforms.Top” for that? [2]

If it doesn’t work for you, could you provide more details of your use case? 
Then we probably can propose the more suitable solutions for that.

[1] 
https://beam.apache.org/documentation/programming-guide/#composite-transforms
[2] 
https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html

—
Alexey

> On 15 Sep 2023, at 14:22, Joey Tran  wrote:
> 
> Is there a way to extend already defined PTransforms? My question is probably 
> better illustrated with an example. Let's say I have a PTransform that 
> generates a very variable number of outputs. I'd like to "wrap" that 
> PTransform such that if it ever creates more than say 1,000 outputs, then I 
> just take the first 1,000 outputs without generating the rest of the outputs.
> 
> It'd be trivial if I have access to the DoFn, but what if the PTransform in 
> question doesn't expose the `DoFn`?



Re: Can we use RedisIO to write records from an unbounded collection

2023-07-21 Thread Alexey Romanenko
Hi Sachin,


> On 21 Jul 2023, at 08:45, Sachin Mittal  wrote:
> 
> I was reading up on this IO here 
> https://beam.apache.org/documentation/io/connectors/ and it states that it 
> only supports batch and not streaming.

I believe it states only about Reading support. For Writing, it mostly depends 
on what is a type of your pipeline (Bounded or Unbounded) and, iinm, every 
connector can be used with both types of pipeline. 

Though, please, be aware that Beam doesn’t guarantee an order of processed 
records. So, it should be taken into account in case of updating the same keys, 
for example.

> If this works then what is a better option to use write or writeStreams 
> function ?

I think it depends on how you are going to use Redis farther.  

RedisIO.write() is general way of how to write data into Redis and supports the 
different methods (APEND, SET, LPUSH, RPUSH, etc). 

RedisIO.writeStreams() is supposed to be used with Redis streams. 

—
Alexey

Re: [Question] Processing chunks of data in batch based pipelines

2023-07-18 Thread Alexey Romanenko
Reading from RDBMS and processing the data downstream of your pipeline is not 
the same in terms of bundling. 

The main “issue" with a former is that it reads mostly in a single thread per 
SQL-query and JDBC client is not exception.  So, Beam can’t split data, that 
are not yet read, into bundles. 

How much records it will keep in the memory is up to corresponding JBDC driver 
and “fetch size” is just a hint for it. So, it depends on driver, not on Beam 
in this case.

As a workaround, as I suggested above, it was developed a “partitioned” version 
of read for JdbcIO which has some weakness but maybe help in some situations.

—
Alexey

> On 17 Jul 2023, at 20:29, Yomal de Silva  wrote:
> 
> Hi Alexey,
> 
> Yes, I have tried changing the fetch size for my implementation. What I 
> observed through the Flink dashboard was the reading transform gets completed 
> quickly and one of the other transforms takes a much longer time (due to some 
> logic). 
> 
> Even if Apache Beam processes data in bundles when reading from a data source 
> like a database it would not wait till a single bundle reaches the end of the 
> pipeline. Is that understanding correct? So it will eventually read the 
> entire dataset, loading it into memory. 
> 
> I haven't tried the 2nd option you suggested. Will try it out. 
> 
> Thank you
> 
> On Mon, Jul 17, 2023 at 10:08 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
>> Hi Yomal,
>> 
>> Actually, usually all data in Beam pipeline is processed by bundles (or 
>> chunks) if it processed by DoFn. The size of the bundle is up to your 
>> processing engine and, iirc, there is no way in Beam to change it.
>> 
>> Talking about your case -  did you try to change a fetch size for Beam’s 
>> JdbcIO connector or for your own one?
>> Normally, it just gives a hint for the JDBC driver as to the number of rows 
>> that should be fetched from the database [1].
>> 
>> Another option could be to try to read data with JdbcIO.readWithPartitions() 
>> that will execute several instances of the query on the same table
>> using ranges [2].
>> 
>> —
>> Alexey
>> 
>> [1] 
>> https://github.com/apache/beam/blob/c8f68f92097de33fe2c6863344404a1b9922ae27/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1524
>> [2] 
>> https://beam.apache.org/releases/javadoc/2.49.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html#readWithPartitions-org.apache.beam.sdk.values.TypeDescriptor-
>> 
>>> On 17 Jul 2023, at 13:33, Yomal de Silva >> <mailto:yomal.prav...@gmail.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I have a pipeline which reads data from a database(postgresql), enrich the 
>>> data through a side input and finally publish the results to Kafka. 
>>> Currently I am not using the builtin JDBCIO to read the data but I think 
>>> there wont be any difference in using that. With my implementation I have 
>>> set the fetchsize and pass the data to the next transform to process. I 
>>> have 2 questions here,
>>> 
>>> 1. For batch based processing pipelines is there a way to process elements 
>>> in chunks rather than reading the entire dataset and loading that to 
>>> memory? What I have observed is that it occupies a significant amount of 
>>> memory and may even cause OOM exceptions. I am looking for sort of a 
>>> backpressure implementation or any other way to stop reading all the data 
>>> into memory until some of the records gets processed. I have found the 
>>> following answer [1] which states thats not possible, since this answer was 
>>> provided some time ago wanted to check if it is still the case.
>>> 
>>> 2. When dealing with side inputs, again does it loads everything into 
>>> memory and use the appropriate window to carry out the operation inside a 
>>> transform? 
>>> 
>>> Please let me know if you have any solutions for this. 
>>> 
>>> [1] 
>>> https://stackoverflow.com/questions/57580362/how-to-manage-backpressure-with-apache-beam
>>> 
>>> Thank you.
>> 



Re: [Question] Processing chunks of data in batch based pipelines

2023-07-17 Thread Alexey Romanenko
Hi Yomal,

Actually, usually all data in Beam pipeline is processed by bundles (or chunks) 
if it processed by DoFn. The size of the bundle is up to your processing engine 
and, iirc, there is no way in Beam to change it.

Talking about your case -  did you try to change a fetch size for Beam’s JdbcIO 
connector or for your own one?
Normally, it just gives a hint for the JDBC driver as to the number of rows 
that should be fetched from the database [1].

Another option could be to try to read data with JdbcIO.readWithPartitions() 
that will execute several instances of the query on the same table
using ranges [2].

—
Alexey

[1] 
https://github.com/apache/beam/blob/c8f68f92097de33fe2c6863344404a1b9922ae27/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1524
[2] 
https://beam.apache.org/releases/javadoc/2.49.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html#readWithPartitions-org.apache.beam.sdk.values.TypeDescriptor-

> On 17 Jul 2023, at 13:33, Yomal de Silva  wrote:
> 
> Hi all,
> 
> I have a pipeline which reads data from a database(postgresql), enrich the 
> data through a side input and finally publish the results to Kafka. Currently 
> I am not using the builtin JDBCIO to read the data but I think there wont be 
> any difference in using that. With my implementation I have set the fetchsize 
> and pass the data to the next transform to process. I have 2 questions here,
> 
> 1. For batch based processing pipelines is there a way to process elements in 
> chunks rather than reading the entire dataset and loading that to memory? 
> What I have observed is that it occupies a significant amount of memory and 
> may even cause OOM exceptions. I am looking for sort of a backpressure 
> implementation or any other way to stop reading all the data into memory 
> until some of the records gets processed. I have found the following answer 
> [1] which states thats not possible, since this answer was provided some time 
> ago wanted to check if it is still the case.
> 
> 2. When dealing with side inputs, again does it loads everything into memory 
> and use the appropriate window to carry out the operation inside a transform? 
> 
> Please let me know if you have any solutions for this. 
> 
> [1] 
> https://stackoverflow.com/questions/57580362/how-to-manage-backpressure-with-apache-beam
> 
> Thank you.



Re: Getting Started With Implementing a Runner

2023-06-23 Thread Alexey Romanenko


> On 23 Jun 2023, at 17:40, Robert Bradshaw via user  
> wrote:
> 
> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
>> If Beam Runner Authoring Guide is rather high-level for you, then, at fist, 
>> I’d suggest to answer two questions for yourself:
>> - Am I going to implement a portable runner or native one?
> 
> 
> The answer to this should be portable, as non-portable ones will be 
> deprecated.

Well, actually this is a question that I don’t remember we discussed here in 
details before and had a common agreement. 

Actually, I’m not sure that I understand clearly what is meant by “deprecation" 
in this case. For example, Portable Spark Runner is heavily actually based on 
native Spark RDD runner and its translations. So, which part should be 
deprecated and what is a reason for that?

Well, anyway I guess it’s off topic here.

Also, we don’t know if this new runner will be contributed back to Beam, what 
is a runtime and what actually is a final goal of it. 
So I agree that more details on this would be useful.

—
Alexey

> 
>> - Which SDK I should use for this runner?
> 
> 
> The answer to the above question makes this one moot :).
> 
> On a more serious note, could you tell us a bit more about the runner you're 
> looking at implementing?
> 
>> 
>> Then, depending on answers, I’d suggest to take as an example one of the 
>> most similar Beam runners and use it as a more detailed source of 
>> information along with Beam runner doc mentioned before.
>> 
>> —
>> Alexey
>> 
>>> On 22 Jun 2023, at 14:39, Joey Tran >> <mailto:joey.t...@schrodinger.com>> wrote:
>>> 
>>> Hi Beam community!
>>> 
>>> I'm interested in trying to implement a runner with my company's execution 
>>> environment but I'm struggling to get started. I've read the docs page 
>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner> on 
>>> implementing a runner but it's quite high level. Anyone have any concrete 
>>> suggestions on getting started?
>>> 
>>> I've started by cloning and running the hello world example 
>>> <https://github.com/apache/beam-starter-python>. I've then subclassed 
>>> `PipelineRunner 
>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>  to create my own custom runner but at this point I'm a bit stuck. My 
>>> custom runner just looks like
>>> 
>>> class CustomRunner(runner.PipelineRunner):
>>> def run_pipeline(self, pipeline,
>>>  options):
>>> self.visit_transforms(pipeline, options)
>>> 
>>> And when using it I get an error about not having implemented "Impulse"
>>> 
>>> NotImplementedError: Execution of [] 
>>> not implemented in runner .
>>> 
>>> Am I going about this the right way? Are there tests I can run my custom 
>>> runner against to validate it beyond just running the hello world example? 
>>> I'm finding myself just digging through the beam source to try to piece 
>>> together how a runner works and I'm struggling to get a foothold. Any 
>>> guidance would be greatly appreciated, especially if anyone has any 
>>> experience implementing their own python runner.
>>> 
>>> Thanks in advance! Also, could I get a Slack invite?
>>> Cheers,
>>> Joey
>> 



Re: Getting Started With Implementing a Runner

2023-06-23 Thread Alexey Romanenko
If Beam Runner Authoring Guide is rather high-level for you, then, at fist, I’d 
suggest to answer two questions for yourself:
- Am I going to implement a portable runner or native one?
- Which SDK I should use for this runner?

Then, depending on answers, I’d suggest to take as an example one of the most 
similar Beam runners and use it as a more detailed source of information along 
with Beam runner doc mentioned before.

—
Alexey

> On 22 Jun 2023, at 14:39, Joey Tran  wrote:
> 
> Hi Beam community!
> 
> I'm interested in trying to implement a runner with my company's execution 
> environment but I'm struggling to get started. I've read the docs page 
>  on 
> implementing a runner but it's quite high level. Anyone have any concrete 
> suggestions on getting started?
> 
> I've started by cloning and running the hello world example 
> . I've then subclassed 
> `PipelineRunner 
> `
>  to create my own custom runner but at this point I'm a bit stuck. My custom 
> runner just looks like
> 
> class CustomRunner(runner.PipelineRunner):
> def run_pipeline(self, pipeline,
>  options):
> self.visit_transforms(pipeline, options)
> 
> And when using it I get an error about not having implemented "Impulse"
> 
> NotImplementedError: Execution of [] not 
> implemented in runner .
> 
> Am I going about this the right way? Are there tests I can run my custom 
> runner against to validate it beyond just running the hello world example? 
> I'm finding myself just digging through the beam source to try to piece 
> together how a runner works and I'm struggling to get a foothold. Any 
> guidance would be greatly appreciated, especially if anyone has any 
> experience implementing their own python runner.
> 
> Thanks in advance! Also, could I get a Slack invite?
> Cheers,
> Joey



Re: Beam SQL found limitations

2023-05-22 Thread Alexey Romanenko
Hi Piotr,

Thanks for details! I cross-post this to dev@ as well since, I guess, people 
there can provide more insights on this.

A while ago, I faced the similar issues trying to run Beam SQL against TPC-DS 
benchmark. 
We had a discussion around that [1], please, take a look since it can be 
helpful.

[1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b

—
Alexey 

> On 18 May 2023, at 11:36, Wiśniowski Piotr 
>  wrote:
> 
> HI,
> 
> After experimenting with Beam SQL I did find some limitations. Testing on 
> near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with 
> Calcite, direct runner and openjdk version "11.0.19". Please let me know if 
> some of them are known/ worked on/ have tickets or have estimated fix time. I 
> believe most of them are low hanging fruits or just my thinking is not right 
> for the problem. If this is the case please guide me to some working solution.
> 
>  From my perspective it is ok to have a fix just on master - no need to wait 
> for release. Priority order: 
> - 7. Windowing function on a stream - in detail - How to get previous message 
> for a key? setting expiration arbitrary big is ok, but access to the previous 
> record must happen fairly quickly not wait for the big window to finish and 
> emit the expired keys. Ideally would like to do it in pure beam pipeline as 
> saving to some external key/value store and then reading this here could 
> potentially result in some race conditions which in I would like to avoid, 
> but if its the only option - let it be.
> - 5. single UNION ALL possible
> - 4. UNNEST ARRAY with nested ROW
> - 3. Using * when there is Row type present in the schema
> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if one 
> is a static number table
> - 2. ROW construction not supported. It is not possible to nest data
> 
> Below queries tat I use to testing this scenarios.
> 
> Thank You for looking at this topics!
> 
> Best
> 
> Wiśniowski Piotr
> 
> ---
> -- 1. `CROSS JOIN` between two unrelated tables is not supported. 
> ---
> -- Only supported is `CROSS JOIN UNNEST` when exploding array from same table.
> -- It is not possible to number rows
> WITH data_table AS (
> SELECT 1 AS a
> ),
> number_table AS (
> SELECT 
> numbers_exploded AS number_item
> FROM UNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) AS 
> numbers_exploded
> )
> SELECT 
> data_table.a,
> number_table.number_item
> FROM data_table
> CROSS JOIN number_table
> ;
> -- CROSS JOIN, JOIN ON FALSE is not supported!
> 
> 
> ---
> -- 2. ROW construction not supported. It is not possible to nest data
> ---
> SELECT ROW(1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 
> SELECT (1,2,'a') AS r; -- java.lang.NoSuchFieldException: EXPR$0 
> SELECT MAP['field1',1,'field2','a']; -- Parameters must be of the same type
> SELECT MAP['field1','b','field2','a']; -- null
> -- WORKAROUND - manually compose json string, 
> -- drawback - decomposing might be not supported or would need to be also 
> based on string operations
> SELECT ('{"field1":"' || 1 || '","field2":"' || 'a' || '"}') AS `json_object`;
> 
> 
> ---
> -- 3. Using * when there is Row type present in the schema
> ---
> CREATE EXTERNAL TABLE test_tmp_1(
> `ref` VARCHAR,
> `author` ROW<
> `name` VARCHAR,
> `email` VARCHAR
> >
> )
> TYPE text
> LOCATION 'python/dbt/tests/using_star_limitation.jsonl'
> TBLPROPERTIES '{"format":"json", 
> "deadLetterFile":"top/python/dbt/tests/dead"}';
> SELECT * FROM test_tmp_1;
> --  java.lang.NoSuchFieldException: name 
> -- WORKAROUND - refer to columns explicitly with alias
> SELECT 
> `ref` AS ref_value, 
> test_tmp_1.`author`.`name` AS author_name, -- table name must be 
> referenced explicitly - this could be fixed too
> test_tmp_1.`author`.`email` AS author_name
> FROM test_tmp_1;
> 
> 
> ---
> -- 4. UNNEST ARRAY with nested ROW
> ---
> CREATE EXTERNAL TABLE test_tmp(
> `ref` VARCHAR,
> `commits` ARRAY `id` VARCHAR,
> `author` ROW<
> `name` VARCHAR,
> `email` VARCHAR
> >
> >>
> )
> TYPE text
> LOCATION 'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
> TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}';
> SELECT
> test_tmp.`ref` AS branch_name,
> commit_item.`id` AS commit_hash,
> commit_item.`author`.`name` AS author_name
> FROM test_tmp
> CROSS JOIN UNNEST(test_tmp.commits) AS commit_item;
> -- Row expected 4 fields (Field{name=ref, description=, type=STRING, 
> options={{}}}, Field{name=commits, description=, type=ARRAY author ROW> NOT NULL>, options={{}}}, 
> Field{name=id, description=, type=STRING, options={{}}}, Field{name=author, 
> description=, type=ROW, options={{}}}). 
> initializ

Re: [java] Trouble with gradle and using ParquetIO

2023-04-26 Thread Alexey Romanenko
No, I don’t think so, since iirc hadoop dependencies intentionally were made 
“provided” to support different Hadoop versions and make it "up to user" to 
finally decide which version to use.

—
Alexey

> On 26 Apr 2023, at 05:51, Evan Galpin  wrote:
> 
> The root cause was actually   "java.lang.ClassNotFoundException: 
> org.apache.hadoop.io.Writable" which I eventually fixed by including 
> hadoop-common as a dep for my pipeline (below).  Should hadoop-common be 
> listed as a dep of ParquetIO the beam repo itself? 
> 
> implementation "org.apache.hadoop:hadoop-common:3.2.4"
> 
> On Fri, Apr 21, 2023 at 10:38 AM Evan Galpin  <mailto:egal...@apache.org>> wrote:
>> Oops, I was looking at the "bootleg" mvnrepository search engine, which 
>> shows `compileOnly` in the copy-pastable dependency installation prompts[1]. 
>>  When I received the "ClassNotFound" error, my thought was that the dep 
>> should be installed in "implementation" mode.  When I tried that, I get 
>> other more strange errors when I try to run my pipeline: 
>> "java.lang.NoClassDefFoundError: Could not initialize class 
>> org.apache.beam.sdk.coders.CoderRegistry".
>> 
>> My deps are like so:
>> implementation "org.apache.beam:beam-sdks-java-core:${beamVersion}"
>> implementation "org.apache.beam:beam-sdks-java-io-parquet:${beamVersion}"
>> ...
>> 
>> Not sure why the CoderRegistry error comes up at runtime when both of the 
>> above deps are included. 
>> 
>> [1] 
>> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-parquet/2.46.0
>> 
>> On Fri, Apr 21, 2023 at 2:34 AM Alexey Romanenko > <mailto:aromanenko@gmail.com>> wrote:
>>> Just curious. where it was documented like this?
>>> 
>>> I briefly checked it on Maven Central [1] and the provided code snippet for 
>>> Gradle uses “implementation” scope.
>>> 
>>> —
>>> Alexey
>>> 
>>> [1] 
>>> https://search.maven.org/artifact/org.apache.beam/beam-sdks-java-io-parquet/2.46.0/jar
>>> 
>>> > On 21 Apr 2023, at 01:52, Evan Galpin >> > <mailto:egal...@apache.org>> wrote:
>>> > 
>>> > Hi all,
>>> > 
>>> > I'm trying to make use of ParquetIO.  Based on what's documented in maven 
>>> > central, I'm including the artifact in "compileOnly" mode (or in maven 
>>> > parlance, 'provided' scope).  I can successfully compile my pipeline, but 
>>> > when I run it I (intuitively?) am met with a ClassNotFound exception for 
>>> > ParquetIO.
>>> > 
>>> > Is 'compileOnly' still the desired way to include ParquetIO as a pipeline 
>>> > dependency? 
>>> > 
>>> > Thanks,
>>> > Evan
>>> 



Re: [java] Trouble with gradle and using ParquetIO

2023-04-21 Thread Alexey Romanenko
Just curious. where it was documented like this?

I briefly checked it on Maven Central [1] and the provided code snippet for 
Gradle uses “implementation” scope.

—
Alexey

[1] 
https://search.maven.org/artifact/org.apache.beam/beam-sdks-java-io-parquet/2.46.0/jar

> On 21 Apr 2023, at 01:52, Evan Galpin  wrote:
> 
> Hi all,
> 
> I'm trying to make use of ParquetIO.  Based on what's documented in maven 
> central, I'm including the artifact in "compileOnly" mode (or in maven 
> parlance, 'provided' scope).  I can successfully compile my pipeline, but 
> when I run it I (intuitively?) am met with a ClassNotFound exception for 
> ParquetIO.
> 
> Is 'compileOnly' still the desired way to include ParquetIO as a pipeline 
> dependency? 
> 
> Thanks,
> Evan



Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries

2023-04-20 Thread Alexey Romanenko
Some Java IO-connectors implement a class something like "class ReadAll extends 
PTransform, PCollection>” where “Read” is 
supposed to be configured dynamically. As a simple example, take a look on 
“SolrIO” [1] 

So, to support what you are looking for, “ReadAll”-pattern should be 
implemented for ElasticsearchIO.

—
Alexey

[1] 
https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java

> On 19 Apr 2023, at 19:05, Murphy, Sean P. via user  
> wrote:
> 
> I'm running into an issue using the ElasticsearchIO.read() to handle more 
> than one instance of a query. My queries are being dynamically built as a 
> PCollection based on an incoming group of values. I'm trying to see how to 
> load the .withQuery() parameter which could provide this capability or any 
> approach that provides flexibility.
>  
> The issue is that ElasticsearchIO.read() method expects a PBegin input to 
> start a pipeline, but it seems like I need access outside of a pipeline 
> context somehow. PBegin represents the beginning of a pipeline, and it's 
> required to create a pipeline that can read data from Elasticsearch using 
> IOElasticsearchIO.read().
>  
> Can I wrap the ElasticsearchIO.read() call in a Create transform that creates 
> a PCollection with a single element (e.g., PBegin) to simulate the beginning 
> of a pipeline or something similar?
>  
> Here is my naive attempt without accepting the reality of PBegin:
>PCollection queries = ... // a PCollection of Elasticsearch queries
>
> PCollection queryResults = queries.apply(
> ParDo.of(new DoFn() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> String query = c.element();
> PCollection results = c.pipeline()
> .apply(ElasticsearchIO.read()
> .withConnectionConfiguration(
> 
> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
> .withQuery(query));
> c.output(results);
> }
> })
> .apply(Flatten.pCollections()));
>  
>  
> In general I'm wondering for any of IO-related classes proved by Beam that 
> conforms to PBegin input -- if there is a means to introduce a collection.
> 
>  
> 
> Here is one approach that might be promising:
> 
> // Define a ValueProvider for a List
> ValueProvider> myListProvider = 
> ValueProvider.StaticValueProvider.of(myList);
>  
> // Use the ValueProvider to create a PCollection of Strings
> PCollection pcoll = pipeline.apply(Create.ofProvider(myListProvider, 
> ListCoder.of()));
>  
> PCollection partitionData = PBegin.in(pipeline)
> .apply("Read data from Elasticsearch", 
> ElasticsearchIO.read().withConnectionConfiguration(connConfig).withQuery(ValueProvider
>  pcoll).withScrollKeepalive("1m").withBatchSize(50))
> .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(), 
> opt.getMedTagVersion(), opt.getNoteType()));
>  
> Any thoughts or ideas would be great.   Thanks, ~Sean



Re: ClassCastException when I have bytes type in row

2023-04-18 Thread Alexey Romanenko
Hi Jeff,

Sorry for delay with an answer.

Could you give more details (e.g. your pipeline and sql query code snippets) 
about how to reproduce this issue? 
Which Beam version do you use?

—
Alexey 

> On 30 Mar 2023, at 03:21, Jeff Zhang  wrote:
> 
> 
> Hi, folks,
> 
> I asked this question in the beam slack channel, but there seems to be no 
> comments, so I would like to ask it in the user mail list again. Here's the 
> problem I hit, I use SqlTransform to process my data, this following 
> ClassCastException happens when my data has bytes type. It seems like a bug 
> to me. Can anyone help confirm that? Thanks
> 
> 
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.vendor.calcite.v1_28_0.org 
> .apache.calcite.avatica.util.ByteString cannot be cast 
> to [B
>   at SC.eval0(Unknown Source)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.codehaus.janino.ScriptEvaluator.evaluate(ScriptEvaluator.java:798)
>   at 
> org.codehaus.janino.ScriptEvaluator.evaluate(ScriptEvaluator.java:790)
>   at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$CalcFn.processElement(BeamCalcRel.java:316)
>   at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$CalcFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>   at 
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
>   at 
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
>   at 
> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
>   at 
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>   at 
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>   at 
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>   at 
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>  (edited)
> 
> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang



Re: major reduction is performance when using schema registry - KafkaIO

2023-04-13 Thread Alexey Romanenko
Thanks for testing this!

It requires some additional investigations, so I created an issue for that: 
https://github.com/apache/beam/issues/26262

Feel free to add more details if you have there.

—
Alexey

> On 13 Apr 2023, at 12:45, Sigalit Eliazov  wrote:
> 
> I have made the suggested change and used 
> ConfluentSchemaRegistryDeserializerProvider
> the results are slightly  better.. average of 8000 msg/sec 
> 
> Thank you both for your response and i'll appreciate if you can keep me in 
> the loop in the planned work with kafka schema or let me know if i can assist 
> in anyway,
> 
> Thanks
> Sigalit
> 
> On Wed, Apr 12, 2023 at 8:00 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
>> Mine was the similar but 
>> "org.apache.beam.sdk.io.kafka,ConfluentSchemaRegistryDeserializerProvider" 
>> is leveraging 
>> “io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient” that I 
>> guessed should reduce this potential impact.
>> 
>> —
>> Alexey
>> 
>>> On 12 Apr 2023, at 17:36, John Casey via user >> <mailto:user@beam.apache.org>> wrote:
>>> 
>>> My initial guess is that there are queries being made in order to retrieve 
>>> the schemas, which would impact performance, especially if those queries 
>>> aren't cached with Beam splitting in mind. 
>>> 
>>> I'm looking to improve our interaction with Kafka schemas in the next 
>>> couple of quarters, so I'll keep this case in mind while working on that.
>>> 
>>> John
>>> 
>>> On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko >> <mailto:aromanenko@gmail.com>> wrote:
>>>> I don’t have an exact answer why it’s so much slower for now (only some 
>>>> guesses but it requires some profiling), though could you try to test the 
>>>> same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider” 
>>>> instead of KafkaAvroDeserializer and AvroCoder?
>>>> 
>>>> More details and an example how to use is here:
>>>> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
>>>>  (go to “Use Avro schema with Confluent Schema Registry”)
>>>> 
>>>> —
>>>> Alexey
>>>> 
>>>> 
>>>> 
>>>>> On 10 Apr 2023, at 07:35, Sigalit Eliazov >>>> <mailto:e.siga...@gmail.com>> wrote:
>>>>> 
>>>>> hi,
>>>>> KafkaIO.read()
>>>>> .withBootstrapServers(bootstrapServers)
>>>>> .withTopic(topic)
>>>>> .withConsumerConfigUpdates(Map.ofEntries(
>>>>> Map.entry("schema.registry.url", registryURL),
>>>>> Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ 
>>>>> UUID.randomUUID()),
>>>>> ))
>>>>> .withKeyDeserializer(StringDeserializer.class)
>>>>> .withValueDeserializerAndCoder((Class) 
>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer.class, 
>>>>> AvroCoder.of(avroClass));
>>>>> 
>>>>> Thanks
>>>>> Sigalit
>>>>> 
>>>>> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user >>>> <mailto:user@beam.apache.org>> wrote:
>>>>>> How are you using the schema registry? Do you have a code sample?
>>>>>> 
>>>>>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov >>>>> <mailto:e.siga...@gmail.com>> wrote:
>>>>>>> Hello,
>>>>>>> 
>>>>>>> I am trying to understand the effect of schema registry on our 
>>>>>>> pipeline's performance. In order to do sowe created a very simple 
>>>>>>> pipeline that reads from kafka, runs a simple transformation of adding 
>>>>>>> new field and writes of kafka.  the messages are in avro format
>>>>>>> 
>>>>>>> I ran this pipeline with 3 different options on same configuration : 1 
>>>>>>> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>>>>>> 
>>>>>>> * when i used apicurio as the schema registry i was able to process 
>>>>>>> only 2000 messages per second
>>>>>>> * when i used confluent schema registry i was able to process 7000 
>>>>>>> messages per second
>>>>>>> * when I did not use any schema registry and used plain avro 
>>>>>>> deserializer/serializer i was able to process 30K messages per second.
>>>>>>> 
>>>>>>> I understand that using a schema registry may cause a reduction in 
>>>>>>> performance but  in my opinion the difference is too high. 
>>>>>>> Any comments or suggestions about these results?
>>>>>>> 
>>>>>>> Thanks in advance
>>>>>>> Sigalit
>>>> 
>> 



Re: major reduction is performance when using schema registry - KafkaIO

2023-04-12 Thread Alexey Romanenko
Mine was the similar but 
"org.apache.beam.sdk.io.kafka,ConfluentSchemaRegistryDeserializerProvider" is 
leveraging 
“io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient” that I 
guessed should reduce this potential impact.

—
Alexey

> On 12 Apr 2023, at 17:36, John Casey via user  wrote:
> 
> My initial guess is that there are queries being made in order to retrieve 
> the schemas, which would impact performance, especially if those queries 
> aren't cached with Beam splitting in mind. 
> 
> I'm looking to improve our interaction with Kafka schemas in the next couple 
> of quarters, so I'll keep this case in mind while working on that.
> 
> John
> 
> On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
>> I don’t have an exact answer why it’s so much slower for now (only some 
>> guesses but it requires some profiling), though could you try to test the 
>> same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider” 
>> instead of KafkaAvroDeserializer and AvroCoder?
>> 
>> More details and an example how to use is here:
>> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
>>  (go to “Use Avro schema with Confluent Schema Registry”)
>> 
>> —
>> Alexey
>> 
>> 
>> 
>>> On 10 Apr 2023, at 07:35, Sigalit Eliazov >> <mailto:e.siga...@gmail.com>> wrote:
>>> 
>>> hi,
>>> KafkaIO.read()
>>> .withBootstrapServers(bootstrapServers)
>>> .withTopic(topic)
>>> .withConsumerConfigUpdates(Map.ofEntries(
>>> Map.entry("schema.registry.url", registryURL),
>>> Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ 
>>> UUID.randomUUID()),
>>> ))
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializerAndCoder((Class) 
>>> io.confluent.kafka.serializers.KafkaAvroDeserializer.class, 
>>> AvroCoder.of(avroClass));
>>> 
>>> Thanks
>>> Sigalit
>>> 
>>> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user >> <mailto:user@beam.apache.org>> wrote:
>>>> How are you using the schema registry? Do you have a code sample?
>>>> 
>>>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov >>> <mailto:e.siga...@gmail.com>> wrote:
>>>>> Hello,
>>>>> 
>>>>> I am trying to understand the effect of schema registry on our pipeline's 
>>>>> performance. In order to do sowe created a very simple pipeline that 
>>>>> reads from kafka, runs a simple transformation of adding new field and 
>>>>> writes of kafka.  the messages are in avro format
>>>>> 
>>>>> I ran this pipeline with 3 different options on same configuration : 1 
>>>>> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>>>> 
>>>>> * when i used apicurio as the schema registry i was able to process only 
>>>>> 2000 messages per second
>>>>> * when i used confluent schema registry i was able to process 7000 
>>>>> messages per second
>>>>> * when I did not use any schema registry and used plain avro 
>>>>> deserializer/serializer i was able to process 30K messages per second.
>>>>> 
>>>>> I understand that using a schema registry may cause a reduction in 
>>>>> performance but  in my opinion the difference is too high. 
>>>>> Any comments or suggestions about these results?
>>>>> 
>>>>> Thanks in advance
>>>>> Sigalit
>> 



Re: major reduction is performance when using schema registry - KafkaIO

2023-04-11 Thread Alexey Romanenko
I don’t have an exact answer why it’s so much slower for now (only some guesses 
but it requires some profiling), though could you try to test the same Kafka 
read but with “ConfluentSchemaRegistryDeserializerProvider” instead of 
KafkaAvroDeserializer and AvroCoder?

More details and an example how to use is here:
https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
 (go to “Use Avro schema with Confluent Schema Registry”)

—
Alexey



> On 10 Apr 2023, at 07:35, Sigalit Eliazov  wrote:
> 
> hi,
> KafkaIO.read()
> .withBootstrapServers(bootstrapServers)
> .withTopic(topic)
> .withConsumerConfigUpdates(Map.ofEntries(
> Map.entry("schema.registry.url", registryURL),
> Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ 
> UUID.randomUUID()),
> ))
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializerAndCoder((Class) 
> io.confluent.kafka.serializers.KafkaAvroDeserializer.class, 
> AvroCoder.of(avroClass));
> 
> Thanks
> Sigalit
> 
> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user  > wrote:
>> How are you using the schema registry? Do you have a code sample?
>> 
>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov > > wrote:
>>> Hello,
>>> 
>>> I am trying to understand the effect of schema registry on our pipeline's 
>>> performance. In order to do sowe created a very simple pipeline that reads 
>>> from kafka, runs a simple transformation of adding new field and writes of 
>>> kafka.  the messages are in avro format
>>> 
>>> I ran this pipeline with 3 different options on same configuration : 1 
>>> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>> 
>>> * when i used apicurio as the schema registry i was able to process only 
>>> 2000 messages per second
>>> * when i used confluent schema registry i was able to process 7000 messages 
>>> per second
>>> * when I did not use any schema registry and used plain avro 
>>> deserializer/serializer i was able to process 30K messages per second.
>>> 
>>> I understand that using a schema registry may cause a reduction in 
>>> performance but  in my opinion the difference is too high. 
>>> Any comments or suggestions about these results?
>>> 
>>> Thanks in advance
>>> Sigalit



Re: Recommended way to set coder for JdbcIO with Apache Beam

2023-03-27 Thread Alexey Romanenko
Hmm, it worked fine for me on a primitive pipeline and default AvroCode for 
simple InboundData implementation.

Could you create a GitHub issue for that 
(https://github.com/apache/beam/issues)? It would be very helpful to provide 
the steps there how to reproduce this issue.

Thanks,
Alexey

> On 23 Mar 2023, at 19:08, Juan Cuzmar  wrote:
> 
> unfortunately didn't work.
> I added the @DefaultCoder as you told me and i removed the .withCoder from my 
> JdbcIO pipe and showed the same error:
> 
> java.lang.IllegalStateException: Unable to infer a coder for JdbcIO.readAll() 
> transform. Provide a coder via withCoder, or ensure that one can be inferred 
> from the provided RowMapper.
> 
> 
> 
> 
> Juan Cuzmar.
> 
> --- Original Message ---
> On Thursday, March 23rd, 2023 at 2:17 PM, Alexey Romanenko 
>  wrote:
> 
> 
>> Well, perhaps it’s a bug (I’ll try to check it on my side later).
>> 
>> Can you try to annotate InboundData class with 
>> “@DefaultCoder(AvroCoder.class)” for example and see if it will work?
>> 
>>> On 23 Mar 2023, at 15:13, Juan Cuzmar jcuz...@protonmail.com wrote:
>>> 
>>> Alexey yes! here it is:
>>> 
>>> import lombok.*;
>>> import lombok.extern.jackson.Jacksonized;
>>> 
>>> import java.io.Serializable;
>>> import java.util.List;
>>> import java.util.Map;
>>> 
>>> @Jacksonized
>>> @Builder
>>> @Getter
>>> @Value
>>> @EqualsAndHashCode
>>> @ToString
>>> public class InboundData implements Serializable {
>>> 
>>> String payload;
>>> Map attributes;
>>> List stores;
>>> }
>>> 
>>> --- Original Message ---
>>> On Thursday, March 23rd, 2023 at 8:14 AM, Alexey Romanenko 
>>> aromanenko@gmail.com wrote:
>>> 
>>>> Could you share a class declaration of your InboundData class? Is it just 
>>>> a POJO?
>>>> 
>>>> —
>>>> Alexey
>>>> 
>>>>> On 23 Mar 2023, at 08:16, Juan Cuzmar jcuz...@protonmail.com wrote:
>>>>> 
>>>>> Hello all,
>>>>> 
>>>>> I hope this message finds you well. I am currently working with Apache 
>>>>> Beam's JdbcIO and need some guidance regarding setting a coder for the 
>>>>> input data without resorting to the deprecated withCoder method. I've 
>>>>> been trying to resolve this issue and would appreciate any insights or 
>>>>> recommendations from the community.
>>>>> 
>>>>> Here's a snippet of my code:
>>>>> 
>>>>> .apply("Inserting", JdbcIO.readAll()
>>>>> .withDataSourceProviderFn(DataSourceProvider.of(dbConfig))
>>>>> ...
>>>>> .withRowMapper(resultSet -> {
>>>>> // Mapping logic here
>>>>> }).withOutputParallelization(false)
>>>>> ).setCoder(SerializableCoder.of(InboundData.class))
>>>>> 
>>>>> When I run the code, I encounter the following error:
>>>>> 
>>>>> java.lang.IllegalStateException: Unable to infer a coder for 
>>>>> JdbcIO.readAll() transform.
>>>>> ...
>>>>> 
>>>>> I understand that the withCoder method is marked as deprecated and should 
>>>>> be avoided. Therefore, I would like to know the recommended way to set 
>>>>> the coder for JdbcIO, considering the deprecated status of withCoder.
>>>>> 
>>>>> If there are any specific coding practices or conventions that I should 
>>>>> follow when posting code snippets or emphasizing certain aspects, please 
>>>>> let me know, and I'll be sure to adhere to them in the future.
>>>>> 
>>>>> Thank you for your time and assistance.
>>>>> 
>>>>> Best regards!



Re: Recommended way to set coder for JdbcIO with Apache Beam

2023-03-23 Thread Alexey Romanenko
Well, perhaps it’s a bug (I’ll try to check it on my side later). 

Can you try to annotate InboundData class with “@DefaultCoder(AvroCoder.class)” 
for example and see if it will work?

> On 23 Mar 2023, at 15:13, Juan Cuzmar  wrote:
> 
> Alexey yes! here it is:
> 
> 
>import lombok.*;
>import lombok.extern.jackson.Jacksonized;
> 
>import java.io.Serializable;
>import java.util.List;
>import java.util.Map;
> 
>@Jacksonized
>@Builder
>@Getter
>@Value
>@EqualsAndHashCode
>@ToString
>public class InboundData implements Serializable {
> 
>String payload;
>Map attributes;
>List stores;
>}
> 
> 
> --- Original Message ---
> On Thursday, March 23rd, 2023 at 8:14 AM, Alexey Romanenko 
>  wrote:
> 
> 
>> Could you share a class declaration of your InboundData class? Is it just a 
>> POJO?
>> 
>> —
>> Alexey
>> 
>>> On 23 Mar 2023, at 08:16, Juan Cuzmar jcuz...@protonmail.com wrote:
>>> 
>>> Hello all,
>>> 
>>> I hope this message finds you well. I am currently working with Apache 
>>> Beam's JdbcIO and need some guidance regarding setting a coder for the 
>>> input data without resorting to the deprecated withCoder method. I've been 
>>> trying to resolve this issue and would appreciate any insights or 
>>> recommendations from the community.
>>> 
>>> Here's a snippet of my code:
>>> 
>>> .apply("Inserting", JdbcIO.readAll()
>>> .withDataSourceProviderFn(DataSourceProvider.of(dbConfig))
>>> ...
>>> .withRowMapper(resultSet -> {
>>> // Mapping logic here
>>> }).withOutputParallelization(false)
>>> ).setCoder(SerializableCoder.of(InboundData.class))
>>> 
>>> When I run the code, I encounter the following error:
>>> 
>>> java.lang.IllegalStateException: Unable to infer a coder for 
>>> JdbcIO.readAll() transform.
>>> ...
>>> 
>>> I understand that the withCoder method is marked as deprecated and should 
>>> be avoided. Therefore, I would like to know the recommended way to set the 
>>> coder for JdbcIO, considering the deprecated status of withCoder.
>>> 
>>> If there are any specific coding practices or conventions that I should 
>>> follow when posting code snippets or emphasizing certain aspects, please 
>>> let me know, and I'll be sure to adhere to them in the future.
>>> 
>>> Thank you for your time and assistance.
>>> 
>>> Best regards!



Re: Recommended way to set coder for JdbcIO with Apache Beam

2023-03-23 Thread Alexey Romanenko
Could you share a class declaration of your InboundData class? Is it just a 
POJO?

—
Alexey

> On 23 Mar 2023, at 08:16, Juan Cuzmar  wrote:
> 
> Hello all,
> 
> I hope this message finds you well. I am currently working with Apache Beam's 
> JdbcIO and need some guidance regarding setting a coder for the input data 
> without resorting to the deprecated withCoder method. I've been trying to 
> resolve this issue and would appreciate any insights or recommendations from 
> the community.
> 
> Here's a snippet of my code:
> 
>.apply("Inserting", JdbcIO.readAll()
>.withDataSourceProviderFn(DataSourceProvider.of(dbConfig))
>...
>.withRowMapper(resultSet -> {
>// Mapping logic here
>}).withOutputParallelization(false)
> ).setCoder(SerializableCoder.of(InboundData.class))
> 
> When I run the code, I encounter the following error:
> 
> 
>java.lang.IllegalStateException: Unable to infer a coder for 
> JdbcIO.readAll() transform.
>...
> 
> 
> I understand that the withCoder method is marked as deprecated and should be 
> avoided. Therefore, I would like to know the recommended way to set the coder 
> for JdbcIO, considering the deprecated status of withCoder.
> 
> If there are any specific coding practices or conventions that I should 
> follow when posting code snippets or emphasizing certain aspects, please let 
> me know, and I'll be sure to adhere to them in the future.
> 
> Thank you for your time and assistance.
> 
> Best regards!



Re: Why is FlatMap different from composing Flatten and Map?

2023-03-15 Thread Alexey Romanenko
+CC people who might give more details on this.

—
Alexey

> On 13 Mar 2023, at 19:32, Godefroy Clair  wrote:
> 
> Hi,
> I am wondering about the way `Flatten()` and `FlatMap()` are implemented in 
> Apache Beam Python.
> In most functional languages, FlatMap() is the same as composing `Flatten()` 
> and `Map()` as indicated by the name, so Flatten() and Flatmap() have the 
> same input.
> But in Apache Beam, Flatten() is using _iterable of PCollections_ while 
> FlatMap() is working with _PCollection of Iterables_.
> 
> If I am not wrong, the signature of Flatten, Map and FlatMap are :
> ```
> Flatten:: Iterable[PCollections[A]] -> PCollection[A]
> Map:: (PCollection[A], (A-> B)) -> PCollection[B]
> FlatMap:: (PCollection[Iterable[A]], (A->B)) -> [A]
> ```
> 
> So my question is is there another "Flatten-like" function  with this 
> signature :
> ```
> anotherFlatten:: PCollection[Iterable[A]] -> PCollection[A]
> ```
> 
> One of the reason this would be useful, is that when you just want to 
> "flatten" a `PCollection` of `iterable` you have to use `FlatMap()`with an 
> identity function.
> 
> So instead of writing:
> `FlatMap(lambda e: e)`
> I would like to use a function
> `anotherFlatten()`
> 
> Thanks,
> Godefroy



Re: Backup event from Kafka to S3 in parquet format every minute

2023-02-17 Thread Alexey Romanenko
Piotr,

> On 17 Feb 2023, at 09:48, Wiśniowski Piotr 
>  wrote:
> Does this mean that Parquet IO does not support partitioning, and we need to 
> do some workarounds? Like explicitly mapping each window to a separate 
> Parquet file?
> 

Could you elaborate a bit more on this? IIRC, we used to read partitioned 
Parquet files with ParquetIO while running TPC-DS benchmark.

—
Alexey

Re: Beam CassandraIO

2023-02-02 Thread Alexey Romanenko
- d...@beam.apache.org
+ user@beam.apache.org 

Hi Enzo,

Can you make sure that all your workers were properly added and listed in Spark 
WebUI?

Did you specify a “ --master spark://HOST:PORT” option while running your Beam 
job with a SparkRunner?

PS: Please, use user@beam.apache.org mailing list for such type of questions.

—
Alexey

> On 2 Feb 2023, at 03:18, Enzo Bonggio  wrote:
> 
> I have a spark standalone installed in two machines but once I send 
> spark-submit, it will only execute in one executer. Is that the way that it 
> suppose to work? 
> I thought that I could read from Cassandra with multiple machines 



Re: KafkaIo Metrics

2023-01-20 Thread Alexey Romanenko
IIRC, we don’t expose any Kafka Consumer metrics, so I’m afraid, that there is 
no easy way to get them in a Beam pipeline.

—
Alexey

> On 18 Jan 2023, at 21:43, Lydian  wrote:
> 
> Hi, 
> I know that Beam KafkaIO doesn't use the native kafka offset, and therefore I 
> cannot use kafka metrics directly. 
> 
> Wondering what would be the right way to expose those metrics of my KafkaIO 
> pipeline? 
> Things I am interested includes: 
> bytes-consumed-rate
> fetch-latency-avg
> records-lag
> commit-rate
> consumer lagWondering how people get these metrics or instead of doing this? 
> or we should just enable `commit_offset_in_finalize` and then use the Kafka 
> metrics directly? 
> 
> also wondering if there's anything to notice when enabling the 
> commit_offset_in_finalize? Thanks! 
> 
> Sincerely,
> Lydian Lee
> 



Re: [Question] Trino Connector

2022-12-26 Thread Alexey Romanenko
Hi Ammar,

I don’t think we have a dedicated connector for Trino. I also didn’t find any 
open Beam GitHub issue for this.

On the other hand, it appears that Trino provides a JDBC client driver [1], so 
you may want to try to use Beam JdbcIO connector [2] in this case. 

[1] https://trino.io/docs/current/client/jdbc.html
[2] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/jdbc/JdbcIO.html

—
Alexey

> On 23 Dec 2022, at 23:26, Ammar Alrashed  wrote:
> 
> Hello,
> 
> Have anyone tried to connect Trino with Beam?
> I don't see a Trino connector here 
> , so is there another 
> connector that can be used?
> 
> Is anybody working on a Trino connector?
> 
> Thanks



Re: Beam, Flink state and Avro Schema Evolution is problematic

2022-11-23 Thread Alexey Romanenko
+ dev

Many thanks for sharing your observations and findings on this topic, Cristian!
I copy it to dev@ as well to attract more attention to this problem.

—
Alexey


> On 18 Nov 2022, at 18:21, Cristian Constantinescu  wrote:
> 
> Hi everyone,
> 
> I'm using Beam on Flink with Avro generated records. If the record
> schema changes, the Flink state cannot be restored. I just want to
> send this email out for anyone who may need this info in the future
> and also ask others for possible solutions as this problem is so
> easily hit, that I'm having a hard time figuring out what other users
> of Beam running on the Flink runner are doing to circumvent it.
> 
> The in-depth discussion of the issue can be found here [1] (thanks
> Maximilian). There are also a few more emails about this here [2], and
> here [3].
> 
> The gist of the issue is that Beam serializes the coders used into the
> Flink state, and some of those coders hold references to the
> Bean/Pojos/Java classes they serialize/deserialize to. Flink
> serializes its state using Java serialization, that means that in the
> Flink state we will get a reference to the Bean/Pojo/Java class name
> and the related serialVersionUID. When the pojo (Avro generated)
> changes, so does its serialVersionUID, and Flink cannot deserialize
> the Beam state anymore because the serialVersionUID doesn't match, not
> on the Coder, but on the Pojo type that coder was holding when it got
> serialized.
> 
> I decided to try each coder capable of handling Pojos, one by one, to
> see if any would work. That is, I tried the SerializableCoder,
> AvroCoder and the SchemaCoder/RowCoder. In the case of AvroCoder and
> SerializableCoder, I have used the SpecificRecord version (not the
> GenericRecord one) and the non-Row (ie: the one that returns a Pojo
> type, not Row type) version respectively. They all failed the below
> test (added it to be very explicit, but really, it's just simple
> schema evolution).
> 
> Test:
> 1. Create a avro pojo (idl generated pojo):
> record FooRecord {
> union {null, string} dummy1 = null;
> }
> 2. Create a pipeline with a simple stateful DoFn, set desired coder
> for FooRecord (I tried the SerializableCoder, AvroCoder and the
> SchemaCoder/RowCoder), and populate state with a few FooRecord
> objects.
> 3. Start the pipeline
> 4. Stop the pipeline with a savepoint.
> 5. Augment FooRecord to add another field after dummy1.
> 6. Start the pipeline restoring from the saved savepoint.
> 7. Observed this exception when deserializing the savepoint -->
> "Caused by: java.io.InvalidClassException: com.mymodels.FooRecord;
> local class incompatible: stream classdesc serialVersionUID =  number>, local class serialVersionUID = "
> 
> There are a few workarounds.
> 
> Workaround A:
> Right now my working solution is to implement what was suggested by
> Pavel (thanks Pavel) in [3]. Quote from him "having my business
> logic-related POJOs still Avro-generated, but I introduced another,
> generic one, which just stores schema & payload bytes, and does not
> need to change. then using a DelegateCoder that converts the POJO
> to/from that generic schema-bytes pojo that never changes".
> 
> Basically something like this (pseudocode):
> record FlinkStateValue {
> string schema;
> bytes value;
> }
> 
> var delegateCoder = DelegateCoder.of(
> AvroCoder.of(FlinkStateValue.class),
> (FooRecord in) ->
> FlinkStateValue.setSchema(FooRecord.getSchema()).setValue(AvroCoder.of(FooRecord.class).encode(in)),
> (FlinkStateValue in) -> return
> AvroCoder.of(FooRecord.class).decode(in.getValue())
> ) ;
> 
> p.getCoderRegistry().registerCoderForClass(FooRecord.class, delegateCoder)
> 
> The downside is that now there's yet another deserialization step,
> which wastes CPU cycles. The upside is that things are decoupled, that
> is, I think the DelegateCoder could use a RowCoder.of(FooRecord)
> instead of the AvroCoder.of(FooRecord), or any other coder for that
> matter and you can change between them with only a code change.
> 
> Workaround B:
> Difficulty hard! Use the Flink state api [4] and update the Beam
> serialized state to modify the FooRecord serialVersionUID stored in
> that state to the new one after the schema evolution, then save the
> state and start your pipeline with the evolved FooRecord.
> 
> Workaround C:
> Wrap the Avro generated FooRecord to a real Pojo or AutoValue or
> anything that you have full control over serialVersionUID, and use
> that in your pipeline especially when putting things into the state.
> 
> Problem arises when the Avro generated records have lots of properties
> and or nested records. It becomes tedious to essentially duplicate
> them to Pojo/AutoValue.
> 
> Conclusion:
> I want to end by asking advice from the community. For those of you
> who use Beam with Avro records running on the Flink runner, how do you
> handle state when the Avro schema inevitably evolves?
> 
> It just seems like it's such a simple use case and such an easy
> pittrap

Re: Reading from AWS Kinesis Stream Cross account

2022-11-14 Thread Alexey Romanenko
If I’m not mistaken, it’s not supported in the current implementation of 
KinesisIO.

PS: Btw, if you still use KinesisIO connector based on AWS SDK v1 [1] then it’s 
highly recommended to switch to one based on AWS SDK v2 [2] since former is 
deprecated.

[1] 
https://github.com/apache/beam/blob/master/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
[2] 
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java

—
Alexey

> On 11 Nov 2022, at 12:34, Sankar Subramaniam 
>  wrote:
> 
> Hello there,
>  
> Good morning.
>  
> We are using Apache Beam (Java SDK 2.35.0) in our data pipeline to read from 
> AWS Kinesis Stream using AWS KDA (Kinesis Data Analytics) and so far it’s 
> working fine for few data pipelines.
>  
> Now we have got a new requirement that AWS KDA (running an application 
> implemented using Apache Bean SDK 2.35.0) needs to read the data from AWS 
> Kinesis Stream in different account.
>  
> I have followed AWS Documentation 
> 
>  to grant required permission and policies for the AWS KDA and in Apache Bean 
> implementation we have something like below,
>  
> KinesisIO.read()
> .withStreamName(getInputPattern())
> .withAWSClientsProvider(
> new KinesisClientsProvider(
> Regions.fromName(getAwsRegion()),
> getAwsCredentialsProvider(),
> getAwsVerifyCertificate(),
> getAwsKinesisServiceEndpoint(),
> getAwsCloudwatchServiceEndpoint()))
>  
> Here, we could only set the Kinesis Stream and not the ARN. With the above 
> implementation, this application couldn’t read from the stream and from the 
> logs we are seeing it’s trying to connect to the stream in the same AWS 
> Account. The ARN formed using streamName assumes it’s in the same AWS Account 
> whereas we want to connect to Kinesis Stream in another AWS Account.
>  
> Note: We are using ‘DefaultAWSCredentialsProviderChain’.
>  
> With this situation, wondering am I missing something / doing incorrectly 
> here. Could you please give us some pointers how to use Beam to read from a 
> (Kinesis)Stream in different AWS Account. Thanks.
>  
> Regards,
> Sankar



Re: Request to suggest alternative approaches for side input use cases in apache beam

2022-10-26 Thread Alexey Romanenko
Well, it depends on how you do use a Redis cache and how often it’s changing. 

For example, if you need to request a cache for a group of input records then 
you can group them into batches and do only one remote call to cache before 
processing this batch, like explained here [1]

In any case, the more details about your use-case and why side inout approach 
doesn’t work well for you would be helpful.

[1] 
https://beam.apache.org/documentation/patterns/grouping-elements-for-efficient-external-service-calls/
 


—
Alexey

> On 29 Sep 2022, at 15:10, Chinni, Madhavi via user  
> wrote:
> 
> Hi,
>  
> We have a stream processing pipeline which process the customer UI 
> interactions data .
> As part of the pipeline we read the information from AWS redis cache and 
> store it in a PCollectionView. The PCollectionView is accessed as side input 
> in the next CombineFnWithContext accumulators and transform functions in the 
> pipeline.
> Could you please suggest an alternative approach where we can avoid using 
> side input for accessing redis cache information in next functions in the 
> pipeline.
>  
> Thanks,
> Madhavi



Re: RabbitMQ Message Print

2022-10-26 Thread Alexey Romanenko
Hi Hitesh,

Let me ask you some questions on this:
1) Did you try to run this pipeline with Direct runner locally?
2) Can you make sure that output messages were successfully written in the same 
queue as you read from after?
3) Can you make sure that your “processElement()” has been called while you 
read the messages?

I’d recommend you to run this pipeline with a local instance of RabbitMq and 
Direct runner. In case of reproducing this locally, it can be easier to debug. 

—
Alexey

> On 15 Oct 2022, at 19:53, chakranthi hitesh  wrote:
> 
> Hi Everyone,
> 
> I have been using Apache beam in the recent past. Recently, My work required 
> me to implement a Source and Sink Connect for Rabbitmq.
> Apache beam version: 2.38 , java SDK (java 1.8), Direct Runner
> 
> I'm able to make a successful connection to Rabbitmq queue and send some 
> messages into queue using standalone producer code.
> 
> When I try to consume those messages using beam pipeline , I am not able to 
> print out the messages,  In the output console the , the Code executes 
> continuously in Debug Mode without any messages.
> 
> 
> When I try to 
> 
> Here is the Code I'm using:
> 
> package org.rabbit;
> import org.apache.beam.sdk.io.rabbitmq.RabbitMqIO;
> import org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.SimpleFunction;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> 
> Public class RabbtiConsumer{
> public static void main(String args[ ] ){
>   Pipeline p = Pipeline.create();
>   String serveruri = "amqp://user:password:host:port/virtual_host";
>p.apply("Read from rabbit", RabbitMqIO.read()
>.withUri(serverUri)
>.withExchange(exchange_name, routing_key)
>.withQueue(queue_name))
>.apply(ParDo.of(new DoFn(){
>   @PocessElement <>
>public  void  processElement (DoFn< RabbitMqMessage, 
> String>.ProcessContext c) {
>   String data = c.element().getBody().toString();
>   System.out.println("Reading Message from Queue" + data);
>   c.output(data);
>   }
> }
> ));
> 
> 
>p.run().waitUntilFinish();
> 
>   }
> }
> 
> 
> The message I'm putting into RabbitmQ queue is of XML format.
> I'm guessing this has to with Serialization. 
> 
> 
> If someone has worked on RabbitMqMessage serialization or encountered this 
> problem before, any help would be greatly appreciated.
> 
> Regards
> Hitesh
> 
> 
> 
> 
> 
>  <>
> 
>  <>
> 
> 
> 
> 
> 
> 
> 
> }
> 
> }
> 
> 
> 
> 
> 
> 
> 
> 



Re: Java + Python Xlang pipeline

2022-10-11 Thread Alexey Romanenko
I’m not sure that I get it correctly. What do you mean by “worker pool” in your 
case?

—
Alexey

> On 8 Oct 2022, at 03:24, Xiao Ma  wrote:
> 
> Hello, 
> 
> I would like to run a pipeline with Java as the main language and python 
> transformation embedded. The beam pipeline is running on the flink cluster. 
> Currently, I can run it with a taskmanager + java worker pool and a python 
> worker pool. Could I ask if there is a way to run the java code on the task 
> manager directly and keep the python worker pool?
> 
> Current: taskmanager + java worker pool + python worker pool
> Desired: taskmanager + python worker pool
> 
> Thank you very much.
> 
> Mark Ma
> 



Re: Cross Language

2022-10-11 Thread Alexey Romanenko
Yes, it’s possible though Java IO connector should support being used via 
X-language.

For more details regarding which connector supports this, you may want to take 
a look on IO connectors table on this page [1] and check if the required 
connector is supported "via X-language" for Python SDK. 

[1] https://beam.apache.org/documentation/io/connectors/ 

—
Alexey

> On 11 Oct 2022, at 11:06, phani geeth  wrote:
> 
> Hi,
> 
> Does the present cross Language functionality support creating custom Java 
> transforms and calling from python in Dataflow runner.
> 
> Use case: use existing Java IO as cross Language transform and call in python 
> pipeline in Dataflow runner.
> 
> 
> Regards
> Phani Geeth



Re: [Question] Beam 2.42.0 Release Date Confirmation

2022-10-03 Thread Alexey Romanenko
It’s still in progress of release validation since there is no enough votes for 
the moments:
https://lists.apache.org/thread/grgybp1m1mqx9rdy65czbh0wr1fz0ovg 


Once the release will be voted and approved, then it will be published and 
announced on the mailing list. It may take additional 1-2 days.   

—
Alexey

> On 3 Oct 2022, at 11:43, Varun Chopra via user  wrote:
> 
> Classification: For internal use only
>  
> Hi Team,
>  
> Any confirmation on the 2.42.0 release?
>  
> Please let us know I think its been 72 hours for RC1 also.
>  
> Thanks,
> Varun 
>  
> From: Evan Galpin  
> Sent: Thursday, September 29, 2022 2:15 AM
> To: user@beam.apache.org
> Subject: Re: [Question] Beam 2.42.0 Release Date Confirmation
>  
> Hi there :-)
> 
> You can follow the vote on the first release candidate for 2.42.0 here: 
> https://lists.apache.org/thread/ho2mvvgc253my1ovqmrxjpql8gvz0285 
> 
>  
> Thanks,
> Evan
>  
> On Tue, Sep 27, 2022 at 7:16 AM Varun Chopra via user  > wrote:
> Classification: Public
>  
> Hi Team,
>  
> We at Deutsche Bank are using Apache Beam SDK 2.41.0 and it has 
> Vulnerabilities of netty-codec under beam-vendor-grpcjar.
>  
> We checked the Github fixes and seems like the code is fixed and merged.
>  
> We wanted to understand when can we expect Apache Beam SDK 2.42.0 Version, so 
> that we can start using it.
>  
> We have some urgent requirement for this fix, If you can provide some dates 
> that will help.
>  
> Kind Regards,
> Varun Chopra
>  
> 
> 
> ---
> This e-mail may contain confidential and/or privileged information. If you 
> are not the intended recipient (or have received this e-mail in error) please 
> notify the sender immediately and destroy this e-mail. Any unauthorized 
> copying, disclosure or distribution of the material in this e-mail is 
> strictly forbidden.
> 
> Privacy of communications 
> In order to monitor compliance with legal and regulatory obligations and our 
> policies, procedures and compliance programs, we may review emails and 
> instant messages passing through our IT systems (including any personal data 
> and customer information they contain), and record telephone calls routed via 
> our telephone systems. We will only do so in accordance with local laws and 
> regulations. In some countries please refer to your local DB website for a 
> copy of our Privacy Policy.
> 
> Please refer to https://db.com/disclosures  for 
> additional EU corporate and regulatory disclosures.
> 
> 
> ---
> This e-mail may contain confidential and/or privileged information. If you 
> are not the intended recipient (or have received this e-mail in error) please 
> notify the sender immediately and destroy this e-mail. Any unauthorized 
> copying, disclosure or distribution of the material in this e-mail is 
> strictly forbidden.
> 
> Privacy of communications 
> In order to monitor compliance with legal and regulatory obligations and our 
> policies, procedures and compliance programs, we may review emails and 
> instant messages passing through our IT systems (including any personal data 
> and customer information they contain), and record telephone calls routed via 
> our telephone systems. We will only do so in accordance with local laws and 
> regulations. In some countries please refer to your local DB website for a 
> copy of our Privacy Policy.
> 
> Please refer to https://db.com/disclosures  for 
> additional EU corporate and regulatory disclosures.



Re: [Question] Handling failed records when using JdbcIO

2022-09-19 Thread Alexey Romanenko
Hi,

I don’t think it’s possible “out-of-the-box” now but it could be a useful 
add-on for JdbcIO connector (dead-letter pcollection) since, iirc, it was 
already asked several times by users. For the moment, it’s only possible to 
play with RetryStrategy/RetryConfiguration in case of failures.

—
Alexey

> On 18 Sep 2022, at 12:40, Yomal de Silva  wrote:
> 
> Hi all,
> 
> I am trying to figure out the right approach to handling failed records when 
> persisting to a database through the JdbcIO sink. We have created a DoFn to 
> do this task through a PreparedStatement and catch any exceptions then send 
> it through side output for further processing if required. Is there any 
> built-in way of handling this? 
> 
> Thank you.



Re: [troubleshooting] KafkaIO#write gets stuck "since the associated topicId changed from null to "

2022-09-16 Thread Alexey Romanenko
Thanks Evan for getting back and great that it was resolved by configuration 
tweaks!

—
Alexey

> On 16 Sep 2022, at 16:46, Evan Galpin  wrote:
> 
> Following up to close the loop.  I believe the Kafka errors I was seeing were 
> a red herring.  The actual root cause of the issues was worker nodes running 
> out of memory, and as a result kafka producers would have difficulty 
> competing for resources over GC thrashing.  Increasing the worker node size 
> to where there are no longer OOMKills has removed any kafka "issue".
> 
> Thanks all for your time and willingness to help.
> 
> Evan
> 
> On Tue, Sep 13, 2022 at 12:33 PM Evan Galpin  <mailto:egal...@apache.org>> wrote:
> There's a few related log lines, but there isn't a full stacktrace as the 
> info originates from a logger statement[1] as opposed to thrown exception. 
> The related log lines are like so:
> 
> org.apache.kafka.clients.NetworkClient [Producer clientId=producer-109] 
> Disconnecting from node 10 due to socket connection setup timeout. The 
> timeout value is 11436 ms.[2]
> 
> and
> 
> org.apache.kafka.clients.NetworkClient [Producer clientId=producer-109] Node 
> 10 disconnected.[3]
> 
> [1] 
> https://github.com/apache/kafka/blob/f653cb7b5889fd619ab0e6a25216bd981a9d82bf/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
>  
> <https://github.com/apache/kafka/blob/f653cb7b5889fd619ab0e6a25216bd981a9d82bf/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402>
>  
> [2] 
> https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L820
>  
> <https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L820>
> [3] 
> https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L937
>  
> <https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L937>
> On Tue, Sep 13, 2022 at 12:17 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Do you have by any chance the full stacktrace of this error?
> 
> —
> Alexey
> 
>> On 13 Sep 2022, at 18:05, Evan Galpin > <mailto:egal...@apache.org>> wrote:
>> 
>> Ya likewise, I'd expect this to be handled in the Kafka code without the 
>> need for special handling by Beam.  I'll reach out to Kafka mailing list as 
>> well and try to get a better understanding of the root issue.  Thanks for 
>> your time so far John! I'll ping this thread with any interesting findings 
>> or insights.
>> 
>> Thanks,
>> Evan
>> 
>> On Tue, Sep 13, 2022 at 11:38 AM John Casey via user > <mailto:user@beam.apache.org>> wrote:
>> In principle yes, but I don't see any Beam level code to handle that. I'm a 
>> bit surprised it isn't handled in the Kafka producer layer itself.
>> 
>> On Tue, Sep 13, 2022 at 11:15 AM Evan Galpin > <mailto:egal...@apache.org>> wrote:
>> I'm not certain based on the logs where the disconnect is starting.  I have 
>> seen TimeoutExceptions like that mentioned in the SO issue you linked, so if 
>> we assume it's starting from the kafka cluster side, my concern is that the 
>> producers don't seem to be able to gracefully recover.  Given that 
>> restarting the pipeline (in this case, in Dataflow) makes the issue go away, 
>> I'm under the impression that producer clients in KafkaIO#write can get into 
>> a state that they're not able to recover from after experiencing a 
>> disconnect.  Is graceful recovery after cluster unavailability something 
>> that would be expected to be supported by KafkaIO today?
>> 
>> Thanks,
>> Evan
>> 
>> On Tue, Sep 13, 2022 at 11:07 AM John Casey via user > <mailto:user@beam.apache.org>> wrote:
>> Googling that error message returned 
>> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout
>>  
>> <https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout>
>> and 
>> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
>>  
>> <https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402>
>> 
>> Which suggests that th

Re: [troubleshooting] KafkaIO#write gets stuck "since the associated topicId changed from null to "

2022-09-13 Thread Alexey Romanenko
Do you have by any chance the full stacktrace of this error?

—
Alexey

> On 13 Sep 2022, at 18:05, Evan Galpin  wrote:
> 
> Ya likewise, I'd expect this to be handled in the Kafka code without the need 
> for special handling by Beam.  I'll reach out to Kafka mailing list as well 
> and try to get a better understanding of the root issue.  Thanks for your 
> time so far John! I'll ping this thread with any interesting findings or 
> insights.
> 
> Thanks,
> Evan
> 
> On Tue, Sep 13, 2022 at 11:38 AM John Casey via user  > wrote:
> In principle yes, but I don't see any Beam level code to handle that. I'm a 
> bit surprised it isn't handled in the Kafka producer layer itself.
> 
> On Tue, Sep 13, 2022 at 11:15 AM Evan Galpin  > wrote:
> I'm not certain based on the logs where the disconnect is starting.  I have 
> seen TimeoutExceptions like that mentioned in the SO issue you linked, so if 
> we assume it's starting from the kafka cluster side, my concern is that the 
> producers don't seem to be able to gracefully recover.  Given that restarting 
> the pipeline (in this case, in Dataflow) makes the issue go away, I'm under 
> the impression that producer clients in KafkaIO#write can get into a state 
> that they're not able to recover from after experiencing a disconnect.  Is 
> graceful recovery after cluster unavailability something that would be 
> expected to be supported by KafkaIO today?
> 
> Thanks,
> Evan
> 
> On Tue, Sep 13, 2022 at 11:07 AM John Casey via user  > wrote:
> Googling that error message returned 
> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout
>  
> 
> and 
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
>  
> 
> 
> Which suggests that there is some sort of disconnect happening between your 
> pipeline and your kafka instance.
> 
> Do you see any logs when this disconnect starts, on the Beam or Kafka side of 
> things?
> 
> On Tue, Sep 13, 2022 at 10:38 AM Evan Galpin  > wrote:
> Thanks for the quick reply John!  I should also add that the root issue is 
> not so much the logging, rather that these log messages seem to be correlated 
> with periods where producers are not able to publish data to kafka.  The 
> issue of not being able to publish data does not seem to resolve until 
> restarting or updating the pipeline.
> 
> Here's my publisher config map:
> 
> .withProducerConfigUpdates(
> Map.ofEntries(
> Map.entry(
> ProducerConfig.PARTITIONER_CLASS_CONFIG, 
> DefaultPartitioner.class),
> Map.entry(
> ProducerConfig.COMPRESSION_TYPE_CONFIG, 
> CompressionType.GZIP.name ),
> Map.entry(
> CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
> SecurityProtocol.SASL_SSL.name ),
> Map.entry(
> SaslConfigs.SASL_MECHANISM, 
> PlainSaslServer.PLAIN_MECHANISM),
> Map.entry(
> SaslConfigs.SASL_JAAS_CONFIG, 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"\" password=\"\";")))
> 
> Thanks,
> Evan
> 
> On Tue, Sep 13, 2022 at 10:30 AM John Casey  > wrote:
> Hi Evan,
> 
> I haven't seen this before. Can you share your Kafka write configuration, and 
> any other stack traces that could be relevant? 
> 
> John
> 
> On Tue, Sep 13, 2022 at 10:23 AM Evan Galpin  > wrote:
> Hi all,
> 
> I've recently started using the KafkaIO connector as a sink, and am new to 
> Kafka in general.  My kafka clusters are hosted by Confluent Cloud.  I'm 
> using Beam SDK 2.41.0.  At least daily, the producers in my Beam pipeline are 
> getting stuck in a loop frantically logging this message:
> 
> Node n disconnected.
> 
> Resetting the last seen epoch of partition  to x since the 
> associated topicId changed from null to 
> 
> Updating the running pipeline "resolves" the issue I believe as a result of 
> recreating the Kafka producer clients, but it seems that as-is the KafkaIO 
> producer clients are not resilient to node disconnects.  Might I be missing a 
> configuration option, or are there any known issues like this?
> 
> Thanks,
> Evan



Re: read messages from kakfa: 2 different message types in kafka topic

2022-08-10 Thread Alexey Romanenko
Ops, my bad, I misread the initial question - Moritz pointed out that you have 
the only one topic with two different schemas… 

I don’t think it’s supported by KafkaIO “out-of-the-box.” In this case, you 
need either to write your own deserialiser which will distinguish the schemas 
for every input message or split this topic into two where every topic contains 
the messages with only one schema or use Avro union as it was suggested above.

—
Alexey

> On 10 Aug 2022, at 15:03, Alexey Romanenko  wrote:
> 
> If you have two topics with different schemas in your pipeline then you need 
> to read them separately with two different KafkaIO instances and configure 
> every instance with a proper deserialiser based on its schema.
> 
> —
> Alexey
> 
>> On 9 Aug 2022, at 22:28, Sigalit Eliazov > <mailto:e.siga...@gmail.com>> wrote:
>> 
>> Thanks for your response
>> we have different messages with separate schemas.
>> 
>> I'll review the suggested solution.
>> BR
>> Sigalit
>> 
>> On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack > <mailto:mm...@talend.com>> wrote:
>> Hi Sigalit,
>> 
>>  
>> 
>> Could you explain a bit more in detail what you mean by 2 different types of 
>> messages?
>> 
>> Do they share the same schema, e.g. using a union / one of type? Or are you 
>> in fact talking about different messages with separate schemas (e.g. 
>> discriminated using a message header)?
>> 
>>  
>> 
>> The recommended usage (at least with Confluent) is to use one schema per 
>> topic. Using the Confluent registry it’s fairly simple then:
>> 
>>  
>> 
>>  .withValueDeserializer(
>> 
>> 
>> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* 
>> latest */, config)))
>> 
>>  
>> 
>> Most likely you have to implement a similar DeserializerProvider for 
>> Apicurio. You could also try using  apicurio.registry.as-confluent, but that 
>> requires to configure your producers accordingly.
>> 
>> I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. 
>> That should lead you a path forward.
>> 
>>  
>> 
>> Best,
>> 
>> Moritz
>> 
>>  
>> 
>> On 09.08.22, 13:08, "Sigalit Eliazov" > <mailto:e.siga...@gmail.com>> wrote:
>> 
>>  
>> 
>> Hi all we have a single kafka topic which is used to receive 2 different 
>> types of messages. These 2 messages are Avro. So when reading messages from 
>> kafka i used the GenericRecord KafkaIO. read() 
>> .withBootstrapServers(bootstrapServers)
>> 
>> Hi all
>> 
>> we have a single kafka topic which is used to receive 2 different types of 
>> messages.
>> 
>> These 2 messages are Avro.
>> 
>> So when reading messages from kafka i used the GenericRecord
>> 
>>  
>> 
>> KafkaIO.read()
>> .withBootstrapServers(bootstrapServers)
>> .withTopic(topic)
>> .withConsumerConfigUpdates(ImmutableMap.of(
>> SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
>> ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
>> SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
>> ))
>> .withKeyDeserializer(StringDeserializer.class)
>> I am not sure how to define the withValueDeserializer and coder.
>> i tried to read the message as GenericRecord but it fails with
>>  "Could not extract the Kafka Deserializer type from class 
>> io.apicurio.registry.serde.avro.AvroKafkaDeserialize" 
>> i am using apicurio as the schema registry
>>  
>> Thanks
>> Sigalit
>> As a recipient of an email from Talend, your contact personal data will be 
>> on our systems. Please see our privacy notice. 
>> <https://www.talend.com/privacy/>
>> 
> 



Re: read messages from kakfa: 2 different message types in kafka topic

2022-08-10 Thread Alexey Romanenko
If you have two topics with different schemas in your pipeline then you need to 
read them separately with two different KafkaIO instances and configure every 
instance with a proper deserialiser based on its schema.

—
Alexey

> On 9 Aug 2022, at 22:28, Sigalit Eliazov  wrote:
> 
> Thanks for your response
> we have different messages with separate schemas.
> 
> I'll review the suggested solution.
> BR
> Sigalit
> 
> On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack  > wrote:
> Hi Sigalit,
> 
>  
> 
> Could you explain a bit more in detail what you mean by 2 different types of 
> messages?
> 
> Do they share the same schema, e.g. using a union / one of type? Or are you 
> in fact talking about different messages with separate schemas (e.g. 
> discriminated using a message header)?
> 
>  
> 
> The recommended usage (at least with Confluent) is to use one schema per 
> topic. Using the Confluent registry it’s fairly simple then:
> 
>  
> 
>  .withValueDeserializer(
> 
> 
> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null /* 
> latest */, config)))
> 
>  
> 
> Most likely you have to implement a similar DeserializerProvider for 
> Apicurio. You could also try using  apicurio.registry.as-confluent, but that 
> requires to configure your producers accordingly.
> 
> I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. 
> That should lead you a path forward.
> 
>  
> 
> Best,
> 
> Moritz
> 
>  
> 
> On 09.08.22, 13:08, "Sigalit Eliazov"  > wrote:
> 
>  
> 
> Hi all we have a single kafka topic which is used to receive 2 different 
> types of messages. These 2 messages are Avro. So when reading messages from 
> kafka i used the GenericRecord KafkaIO. read() 
> .withBootstrapServers(bootstrapServers)
> 
> Hi all
> 
> we have a single kafka topic which is used to receive 2 different types of 
> messages.
> 
> These 2 messages are Avro.
> 
> So when reading messages from kafka i used the GenericRecord
> 
>  
> 
> KafkaIO.read()
> .withBootstrapServers(bootstrapServers)
> .withTopic(topic)
> .withConsumerConfigUpdates(ImmutableMap.of(
> SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
> ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
> SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
> ))
> .withKeyDeserializer(StringDeserializer.class)
> I am not sure how to define the withValueDeserializer and coder.
> i tried to read the message as GenericRecord but it fails with
>  "Could not extract the Kafka Deserializer type from class 
> io.apicurio.registry.serde.avro.AvroKafkaDeserialize" 
> i am using apicurio as the schema registry
>  
> Thanks
> Sigalit
> As a recipient of an email from Talend, your contact personal data will be on 
> our systems. Please see our privacy notice. 
> 



Re: Possible bug in ConfluentSchemaRegistryDeserializerProvider withe schema evolution

2022-07-29 Thread Alexey Romanenko
Thanks for the question! Interesting...

I didn’t go deep into the details yet (I will!) but can it be related to this 
change? [1][2]

[1] https://issues.apache.org/jira/browse/BEAM-10759 

[2] https://github.com/apache/beam/pull/12630 


—
Alexey

> On 28 Jul 2022, at 22:43, Cristian Constantinescu  wrote:
> 
> Attaching these two links which kinda point in the same direction as my 
> previous e-mail:
> 
> https://ambitious.systems/avro-writers-vs-readers-schema 
> 
> https://ambitious.systems/avro-schema-resolution 
> 
> 
> On Thu, Jul 28, 2022 at 4:31 PM Cristian Constantinescu  > wrote:
> Hi everyone,
> 
> When using KafkaIO to deserialize to avro SpecificRecords in combination with 
> ConfluentSchemaRegistryDeserializerProvider, it fails when the schema in the 
> avro generated classes (theSpecificRecords) and the schema registry schema 
> (used to serialize the given message) mismatch.
> 
> My scenario is that my Avro generated classes are ahead of what's in the 
> schema registry. So when deserialization happens, it tries to use the schema 
> registry schema to deserialize to the SpecificRecord class and that fails 
> with field order mismatches or field type mismatches depending on the 
> situation.
> 
> I know you're thinking that my schema evolution is bad and that I should go 
> sit in the corner. However, only new fields are added to the schema, so it 
> should not be an issue.
> 
> Has anyone seen this happen to them?
> 
> What I think happens:
> 
> 1. ConfluentSchemaRegistryDeserializerProvider configures the 
> AbstractKafkaDeserializer to use the confluent schema as the reader schema. 
> [1][2]
> 2. If specific.avro.reader is set to true, the 
> ConfluentSchemaRegistryDeserializer (Beam owned) [3] eventually calls 
> AbstractKafkaAvroDeserializer (Confluent owned)[4]. Effectively, the 
> ConfluentSchemaRegistryDeserializer sets the reader schema and the 
> AbstractKafkaAvroDeserializer sets the writer schema. However, both schemas 
> are fetched from the schema registry. Both classes fetch the same schema 
> separately.
> 3. Now this is a problem, because at my understanding, the write schema is 
> used to tell avro what schema was used to serialize the object and the reader 
> schema is used to tell avro what to deserialize those bytes to, in case it's 
> not the same schema. I'm really not sure about this, but I think that's how 
> it works.
> 4. Because both read and write schema are fetched from the schema registry, 
> but our SpecificRecord class has an evolved schema that is not used, 
> deserialization to that class fails.
> 5. This is why I think that if specific.avro.reader is set to true, the 
> ConfluentSchemaRegistryDeserializer class should pass the schema fetch from 
> the SpecificRecord class on line [1].
> 
> Would anyone be able to confirm or infirm the above logic makes sense?
> 
> As for my issue, as a workaround, I just wrote a DeserializerProvider that 
> does exactly what the ConfluentSchemaRegistryDeserializerProvider does, but 
> passes in the schema fetched from my SpecificRecord class and serialization 
> works properly.
> 
> Cheers,
> Cristian
> 
> [1] 
> https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L133
>  
> [2]
>  
> https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L144
>  
> 
> [3] 
> https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L175
>  
> 
> [4] 
> https://github.com/confluentinc/schema-registry/blob/2c2a356755b000e524123f9676ace98bd3c74f59/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L152
>  
> 


Re: Oracle Database Connection Pool creation from Beam

2022-07-29 Thread Alexey Romanenko
Additionally to what Moritz said, I just wanted to give an example of using 
JdbcIO.PoolableDataSourceProvider taken from JdbcIO Javadoc:

To customize the building of the DataSource we can provide a 
SerializableFunction. For example if you need to provide a PoolingDataSource 
from an existing JdbcIO.DataSourceConfiguration: you can use a 
JdbcIO.PoolableDataSourceProvider:


 pipeline.apply(JdbcIO.>read()
   .withDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(
   JdbcIO.DataSourceConfiguration.create(
   "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb",
   "username", "password")))
// ...
 ); 

Did you try this? If yes, could you provide more details about your error?

—
Alexey

> On 29 Jul 2022, at 10:40, Moritz Mack  wrote:
> 
> Could you share some more details what you’ve tried so far?
> I suppose you are using the JdbcIO, right? Have you looked at 
> JdbcIO.PoolableDataSourceProvider?
>  
> / Moritz
>  
>  
> On 28.07.22, 17:35, "Koka, Deepthi via dev"  > wrote:
> Hi Team, We have an issue with the Oracle connections being used up and we 
> have tried to implement a pooled data source using PooledDataSourceFactory, 
> somehow we are ending up with “Invalid Universal Connection Pool 
> Configuration:⁠​ oracle.⁠​ucp.⁠​UniversalConnectionPoolException:⁠​
> Hi Team,
>  
> We have an issue with the Oracle connections being used up and we have tried 
> to implement a pooled data source using PooledDataSourceFactory, somehow we 
> are ending up with “Invalid Universal Connection Pool Configuration: 
> oracle.ucp.UniversalConnectionPoolException: Universal Connection Pool 
> already exists in the Universal Connection Pool Manager.
>  
> Can you please suggest us a standard way of using Pooled data sources in 
> Apache beam?
>  
> Regards,
> Deepthi.
> This email and any files transmitted with it are confidential and intended 
> solely for the use of the addressee. If you are not the intended addressee, 
> then you have received this email in error and any use, dissemination, 
> forwarding, printing, or copying of this email is strictly prohibited. Please 
> notify us immediately of your unintended receipt by reply and then delete 
> this email and your reply. Tyson Foods, Inc. and its subsidiaries and 
> affiliates will not be held liable to any person resulting from the 
> unintended or unauthorized use of any information contained in this email or 
> as a result of any additions or deletions of information originally contained 
> in this email.
> As a recipient of an email from Talend, your contact personal data will be on 
> our systems. Please see our privacy notice. 


Re: RedisIO Apache Beam JAVA Connector

2022-07-20 Thread Alexey Romanenko
I believe that Read and Write parts of RedisIO are well independent and I’m not 
aware of any issues with Write.

—
Alexey

> On 20 Jul 2022, at 00:52, Shivam Singhal  wrote:
> 
> Hi Alexey!
> 
> Thanks for replying.
> I think we will only use RedisIO to write to redis. From your reply & github 
> issue 21825, it seems SDF is causing some issue in reading from Redis.
> 
> Do you know of any issues with Write? 
> 
> If I get a chance to test the reading in my staging environment, I will :)
> 
> Thanks,
> Shivam Singhal
> 
> On Mon, 18 Jul 2022 at 22:22, Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Hi Shivam,
> 
> RedisIO is already for quite a long time in Beam, so we may consider it’s 
> rather stable. I guess it was marked @Experimental since its user API was 
> changing at that moment (that a point) [1].
> 
> However, recently RedisIO has moved to SDF for a reading part, so I can’t say 
> how it was heavily tested in production system. AFAICT, there is an open 
> issue [2] that is likely related to this.
> 
> It would be great if you could test this IO in your testing enviroment and 
> provide some feedback how it works for your cases.
> 
> —
> Alexey
>  
> [1] https://issues.apache.org/jira/browse/BEAM-9231 
> <https://issues.apache.org/jira/browse/BEAM-9231>
> [2] https://github.com/apache/beam/issues/21825 
> <https://github.com/apache/beam/issues/21825>
> 
> 
>> On 18 Jul 2022, at 02:19, Shivam Singhal > <mailto:shivamsinghal5...@gmail.com>> wrote:
>> 
>> Hi everyone,
>> 
>> I see that org.apache.beam.sdk.io.redis 
>> <https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/redis/package-summary.html>
>>  version 2.20.0 onwards, this connector is marked experimental.
>> 
>> I tried to see the changelog 
>> <https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12346780>
>>  for v2.20.0 but could not find an explanation. 
>> 
>> I am working with apache beam 2.40.0 and wanted to know which classes and 
>> functions are marked experimental in org.apache.beam.sdk.io.redis:2.40.0 ? 
>> Is it safe to use in production environments?
>> 
>> Thanks!
>> 
>> 
> 



Re: RedisIO Apache Beam JAVA Connector

2022-07-18 Thread Alexey Romanenko
Hi Shivam,

RedisIO is already for quite a long time in Beam, so we may consider it’s 
rather stable. I guess it was marked @Experimental since its user API was 
changing at that moment (that a point) [1].

However, recently RedisIO has moved to SDF for a reading part, so I can’t say 
how it was heavily tested in production system. AFAICT, there is an open issue 
[2] that is likely related to this.

It would be great if you could test this IO in your testing enviroment and 
provide some feedback how it works for your cases.

—
Alexey
 
[1] https://issues.apache.org/jira/browse/BEAM-9231
[2] https://github.com/apache/beam/issues/21825


> On 18 Jul 2022, at 02:19, Shivam Singhal  wrote:
> 
> Hi everyone,
> 
> I see that org.apache.beam.sdk.io.redis 
> 
>  version 2.20.0 onwards, this connector is marked experimental.
> 
> I tried to see the changelog 
> 
>  for v2.20.0 but could not find an explanation. 
> 
> I am working with apache beam 2.40.0 and wanted to know which classes and 
> functions are marked experimental in org.apache.beam.sdk.io.redis:2.40.0 ? Is 
> it safe to use in production environments?
> 
> Thanks!
> 
> 



Re: RDD (Spark dataframe) into a PCollection?

2022-05-23 Thread Alexey Romanenko


> On 23 May 2022, at 20:40, Brian Hulette  wrote:
> 
> Yeah I'm not sure of any simple way to do this. I wonder if it's worth 
> considering building some Spark runner-specific feature around this, or at 
> least packaging up Robert's proposed solution? 

I’m not sure that a runner specific feature is a good way to do this since the 
other runners won’t be able to support it or I’m missing something?

> There could be other interesting integrations in this space too, e.g. using 
> Spark RDDs as a cache for Interactive Beam.

Another option could be to add something like SparkIO (or FlinkIO/whatever) to 
read/write data from/to Spark data structures for such cases (Spark schema to 
Beam schema convention also could be supported). And dreaming a bit more, for 
those who need to have a mixed pipeline (e.g. Spark + Beam) such connectors 
could support the push-downs of pure Spark pipelines and then use the result 
downstream in Beam.

—
Alexey


> 
> Brian
> 
> On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw  > wrote:
> The easiest way to do this would be to write the RDD somewhere then
> read it from Beam.
> 
> On Mon, May 23, 2022 at 9:39 AM Yushu Yao  > wrote:
> >
> > Hi Folks,
> >
> > I know this is not the optimal way to use beam :-) But assume I only use 
> > the spark runner.
> >
> > I have a spark library (very complex) that emits a spark dataframe (or RDD).
> > I also have an existing complex beam pipeline that can do post processing 
> > on the data inside the dataframe.
> >
> > However, the beam part needs a pcollection to start with. The question is, 
> > how can I convert a spark RDD into a pcollection?
> >
> > Thanks
> > -Yushu
> >



Re: RDD (Spark dataframe) into a PCollection?

2022-05-23 Thread Alexey Romanenko
To add a bit more to what Robert suggested. Right, in general we can’t read 
Spark RDD directly with Beam (Spark runner uses RDD under the hood but it’s a 
different story) but you can write the results to any storage and in data 
format that Beam supports and then read it with a corespondent Beam IO 
connector.

—
Alexey

> On 23 May 2022, at 20:35, Robert Bradshaw  wrote:
> 
> The easiest way to do this would be to write the RDD somewhere then
> read it from Beam.
> 
> On Mon, May 23, 2022 at 9:39 AM Yushu Yao  wrote:
>> 
>> Hi Folks,
>> 
>> I know this is not the optimal way to use beam :-) But assume I only use the 
>> spark runner.
>> 
>> I have a spark library (very complex) that emits a spark dataframe (or RDD).
>> I also have an existing complex beam pipeline that can do post processing on 
>> the data inside the dataframe.
>> 
>> However, the beam part needs a pcollection to start with. The question is, 
>> how can I convert a spark RDD into a pcollection?
>> 
>> Thanks
>> -Yushu
>> 



[PROPOSAL] Stop Spark 2 support in Spark Runner

2022-04-29 Thread Alexey Romanenko
Any objections or comments from Spark 2 users on this topic?

—
Alexey


On 20 Apr 2022, at 19:17, Alexey Romanenko  wrote:

Hi everyone,

A while ago, we already discussed on dev@ that there are several reasons to 
stop provide a support of Spark2 in Spark Runner (in all its variants that we 
have for now - RDD, Dataset, Portable) [1]. In two words, it brings some burden 
to Spark runner support that we would like to avoid in the future.

From the devs perspective I don’t see any objections about this. So, I’d like 
to know if there are users that still uses Spark2 for their Beam pipelines and 
it will be critical for them to keep using it. 

Please, share any your opinion on this!

—
Alexey

[1] https://lists.apache.org/thread/opfhg3xjb9nptv878sygwj9gjx38rmco

> On 31 Mar 2022, at 17:51, Alexey Romanenko  wrote:
> 
> Hi everyone,
> 
> For the moment, Beam Spark Runner supports two versions of Spark - 2.x and 
> 3.x. 
> 
> Taking into account the several things that:
> - almost all cloud providers already mostly moved to Spark 3.x as a main 
> supported version;
> - the latest Spark 2.x release (Spark 2.4.8, maintenance release) was done 
> almost a year ago;
> - Spark 3 is considered as a mainstream Spark version for development and bug 
> fixing;
> - better to avoid the burden of maintenance (there are some incompatibilities 
> between Spark 2 and 3) of two versions; 
> 
> I’d suggest to stop support Spark 2 for the Spark Runner in the one of the 
> next Beam releases. 
> 
> What are your thoughts on this? Are there any principal objections or reasons 
> for not doing this that I probably missed?
> 
> —
> Alexey 
> 
> 


Re: [SURVEY] Deprecation of Beam AWS IOs v1 (Java)

2022-04-26 Thread Alexey Romanenko
+1 to deprecate AWS Java SDK v1 connectors in the next Beam release.

—
Alexey

> On 26 Apr 2022, at 18:51, Moritz Mack  wrote:
> 
> Hi Beam AWS user,
>  
> as you might know, there’s currently two different versions of AWS IO 
> connectors in Beam for the Java SDK:
> amazon-web-services  [1] and kinesis [2] for the AWS Java SDK v1
> amazon-web-services2 (including kinesis) [3] for the AWS Java SDK v2
>  
> With the recent release of Beam 2.38.0 [4] amazon-web-services2 has reached 
> feature parity with version 1 and we encourage users to give it a try if you 
> haven’t done so.
>  
> I’m reaching out to see if there’s any blockers, difficulties or other 
> obstacles preventing users from migrating to version 2.
>  
> Maintaining multiple versions of the same IOs is obviously painful.
> Based on your feedback, we hope to then deprecate the earlier version with 
> one of the next few Beam releases.
>  
> Best,
> Moritz
>  
> [1] 
> https://github.com/apache/beam/tree/master/sdks/java/io/amazon-web-services 
> 
> [2] https://github.com/apache/beam/tree/master/sdks/java/io/kinesis 
> 
> [3] 
> https://github.com/apache/beam/tree/master/sdks/java/io/amazon-web-services2 
> 
> [4] https://beam.apache.org/blog/beam-2.38.0/ 
> 
>  
>  
> As a recipient of an email from Talend, your contact personal data will be on 
> our systems. Please see our privacy notice. 


Re: JdbcIO

2022-04-22 Thread Alexey Romanenko
I don’t think it exists.

Do you really need to have an unbounded pipeline, meaning that the data will 
continuously arrive, or just re-running a batch pipeline once per some amount 
of time or externally triggered by some signal shouldn’t be enough?

—
Alexey  

> On 22 Apr 2022, at 13:40, Eric Berryman  wrote:
> 
> Does an unbounded JdbcIO exist, or would I need to wrap the existing one in a 
> spilttable DoFn? Or maybe there is an easier way to do it?
> 
> Thank you again,
> Eric
> 
> 
> 
> On Wed, Apr 20, 2022, 21:59 Ahmet Altay  > wrote:
> /cc @Pablo Estrada  @John Casey 
>  
> 
> On Wed, Apr 20, 2022 at 6:29 PM Eric Berryman  > wrote:
> Hello,
> 
> I have a rather simple use case where I would like to read a db table, which 
> acts as a queue (~ hundreds millions events in initial load, but only 
> thousands of events per day), and write that data out to a sink. This 
> pipeline would be unbounded.
> 
> I'm looking for reading material, and or code, which displays reading from 
> the JdbcIO API with checkpoints. I would like to avoid the initial load on 
> restarts, upgrades, etc. :)
> 
> Thank you for your time!
> Eric



Re: [Code Question] Pcollection to List using Java sdk

2022-04-21 Thread Alexey Romanenko
In this case, if you already know that the size of your result is quite small 
and fits into memory than you need to have to materialise your results on one 
worker in the same JVM. You can do that with assigning the same key for all 
result elements and then apply GroupByKey transform over this 
PCollection>. Alternatively, you can use GroupIntoBatches (see 
example in Javadoc [1]) transform for better control on this.

[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
 
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java>

—
Alexey

> On 21 Apr 2022, at 14:31, Kayal P  wrote:
> 
> Hi Alexey,
> 
> I have a small result of Pcollection from SQLTransform. I have to 
> pass this result to a mailing class that sends mail, with body as values from 
> Pcollection, In a tabular format. The number of elements in 
> Pcollection will be less than 10 always.
> 
> Regards,
> Kayal
> 
>> 
>> On Apr 21, 2022, at 5:13 AM, Alexey Romanenko  
>> wrote:
>> 
>> Hi Kayal,
>> 
>> In general, PCollection is infinite collection of elements. So, there is no 
>> only one simple way to do what you are asking and the solution will depend 
>> on a case where it’s needed.
>> 
>> Could you give an example why and where in your pipeline you do need this? 
>> 
>> —
>> Alexey
>> 
>>> On 21 Apr 2022, at 06:28, Kayal P >> <mailto:kayalpaarthi...@gmail.com>> wrote:
>>> 
>>> Hi All,
>>> 
>>> I am trying to convert Pcollection to List using Java sdk. 
>>> Seems there is combiners.ToList transform available in python sdk. Is there 
>>> any similar option available in Java sdk? If not can someone guide me with 
>>> right way of doing this? The Pcollection is very small collection 
>>> less than 10 items. Thanks in advance.
>>> 
>>> Regards,
>>> Kayal 
>> 



Re: [Code Question] Pcollection to List using Java sdk

2022-04-21 Thread Alexey Romanenko
Hi Kayal,

In general, PCollection is infinite collection of elements. So, there is no 
only one simple way to do what you are asking and the solution will depend on a 
case where it’s needed.

Could you give an example why and where in your pipeline you do need this? 

—
Alexey

> On 21 Apr 2022, at 06:28, Kayal P  wrote:
> 
> Hi All,
> 
> I am trying to convert Pcollection to List using Java sdk. 
> Seems there is combiners.ToList transform available in python sdk. Is there 
> any similar option available in Java sdk? If not can someone guide me with 
> right way of doing this? The Pcollection is very small collection 
> less than 10 items. Thanks in advance.
> 
> Regards,
> Kayal 



Re: [PROPOSAL] Stop Spark2 support in Spark Runner

2022-04-20 Thread Alexey Romanenko
Hi everyone,

A while ago, we already discussed on dev@ that there are several reasons to 
stop provide a support of Spark2 in Spark Runner (in all its variants that we 
have for now - RDD, Dataset, Portable) [1]. In two words, it brings some burden 
to Spark runner support that we would like to avoid in the future.

From the devs perspective I don’t see any objections about this. So, I’d like 
to know if there are users that still uses Spark2 for their Beam pipelines and 
it will be critical for them to keep using it. 

Please, share any your opinion on this!

—
Alexey

[1] https://lists.apache.org/thread/opfhg3xjb9nptv878sygwj9gjx38rmco

> On 31 Mar 2022, at 17:51, Alexey Romanenko  wrote:
> 
> Hi everyone,
> 
> For the moment, Beam Spark Runner supports two versions of Spark - 2.x and 
> 3.x. 
> 
> Taking into account the several things that:
> - almost all cloud providers already mostly moved to Spark 3.x as a main 
> supported version;
> - the latest Spark 2.x release (Spark 2.4.8, maintenance release) was done 
> almost a year ago;
> - Spark 3 is considered as a mainstream Spark version for development and bug 
> fixing;
> - better to avoid the burden of maintenance (there are some incompatibilities 
> between Spark 2 and 3) of two versions; 
> 
> I’d suggest to stop support Spark 2 for the Spark Runner in the one of the 
> next Beam releases. 
> 
> What are your thoughts on this? Are there any principal objections or reasons 
> for not doing this that I probably missed?
> 
> —
> Alexey 
> 
> 



Re: KafkaIO consumer rate

2022-04-11 Thread Alexey Romanenko
Hi Sigalit,

Could you try to run your test pipeline with 
“--experiments=use_deprecated_read” option and see if there is a difference?

—
Alexey

> On 10 Apr 2022, at 21:14, Sigalit Eliazov  wrote:
> 
> Hi all
> I saw a very low rate when  message consuming from kafka in our different 
> jobs.
> I order to find the bottleneck i created 
> a very simple pipeline that reads string messages from kafka and just prints 
> the output .
> The pipeline runs over flink cluster with the following setup:
> 1 task manager, 3 slots, parallelism set to 3
> 
> 
> PCollection> readFromKafka = 
> pipeline.apply("readFromKafka",
> KafkaTransform.readStrFromKafka(
> pipelineUtil.getBootstrapServers(), topic_name, 
> consumer_group));
> readFromKafka.apply("Get message contents", Values.create())
> .apply("Log messages", 
> MapElements.into(TypeDescriptor.of(String.class))
> .via(message -> {
> log.atInfo().log("Received: {}", message);
> return message;
> }));
> 
> the kafka consumer is:
> return KafkaIO.read()
> .withBootstrapServers(bootstrapServers)
> .withTopic(topic)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .withConsumerConfigUpdates((ImmutableMap.of(
> "auto.offset.reset", "earliest",
> ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)))
> .withoutMetadata();
> 
> according to the metrics it seems that i do  have 3 threads that read from 
> kafka but each one reads around 56 records per second.
> per my opinion this is a very low rate.
> I am not sure I understand this behaviour.
> I have checked cpu and memory issues and they both look ok.
> Any ideas would be really appreciated
> Thanks alot
> Sigalit
> 



Re: [Bug]

2022-04-08 Thread Alexey Romanenko
*** I move it to user@beam.apache.org since it’s rather user-related question 
***


Hello Liu,

Could you check which versions of Jackson you have in your Spark class path 
while running a job?

—
Alexey 

> On 8 Apr 2022, at 07:01, Liu Jie  wrote:
> 
> Dear Sir/Madam,
> 
> I followed the official documentation to download the WordCount example 
> (https://beam.apache.org/get-started/quickstart-java/ 
> ), used Maven to build 
> the project, deployed spark-2.4.8-bin-hadoop2.7 locally, and followed the 
> official Spark Runner configuration 
> (https://beam.apache.org/documentation/runners/spark/ 
> ), the error was 
> reported.
> 
> Exception in thread "main" java.lang.NoSuchMethodError: 
> com.fasterxml.jackson.databind.type.TypeBindings.emptyBindings()Lcom/fasterxml/jackson/databind/type/TypeBindings;
> at 
> org.apache.beam.sdk.options.PipelineOptionsFactory.createBeanProperty(PipelineOptionsFactory.java:1706)
> 
> 
> The Spark commit cluster command is:
> ./bin/spark-submit \
>   --class org.apache.beam.examples.WordCount \
>   --master spark://yuanshu-2288H-V5:7077 \
>   
> /share/apache_beam_code/word-count-beam/target/word-count-beam-bundled-0.1.jar
>  \
> --runner=SparkRunner \
> --inputFile=/share/k_means_data_python/1_point_4_center_2_feature.txt \
> --output=/share/apache_beam_code/word-count-beam/src/main/java/BeamKmeans/4core6G1worker/output
> 
> The pom.xml file and full error output in Maven are in the attachment.
> 
> I created the question on StackOverflow:
> https://stackoverflow.com/questions/71791853/apache-beam-2-37-0-sparkrunner-program-executing-in-spark-2-4-8-standalone-clust
>  
> 
> 
> Looking forward to your reply
>  
> -- 
> Kind regards,
> Jie Liu
> 
> Mr. Jie Liu
> Computer School, University of South China, Hengyang 421001
> E-Mail: jieliu5...@gmail.com 
> 



Re: Need help with designing a beam pipeline for data enrichment

2022-03-31 Thread Alexey Romanenko
Hi Johannes,

Agree, it looks like a general data processing problem when you need to enrich 
a small dataset with a data from a large one. There can be the different 
solutions depending on your environment and available resources.

At the first glance, your proposal sounds reasonable for me - so it’s mostly a 
question of how to optimise the number of requests to fetch the required 
processed entries. In this case, if you have some shard information, you can 
use it to group your input entries into bundles and do a request for that 
bundle instead of for every input record and to different shards.

Additionally, you may consider to use a Bloom filter to pre-check if a needed 
record was already processed. The advantage of this is that the size of that 
data structure is quite small even for a large number of keys and can be easily 
loaded before. Though, obviously, you would nee to update it with every batch 
run.   

What kind of storage do use to keep already processed data? Does it provide 
O(1) access to specific record or group of records?
What are the approximate sizes of your datasets?

—
Alexey

> On 31 Mar 2022, at 11:57, Johannes Frey  wrote:
> 
> Hi Everybody,
> 
> I'm currently facing an issue where I'm not sure how to design it
> using apache beam.
> I'm batch processing data, it's about 300k entries per day. After
> doing some aggregations the results are about 60k entries.
> 
> The issue that I'm facing now is that the entries from that batch may
> be related to entries already processed at some time in the past and
> if they are, I would need to fetch the already processed record from
> the past and merge it with the new record.
> 
> To make matters worse the "window" of that relationship might be
> several years, so I can't just sideload the last few days worth of
> data and catch all the relationships, I would need to on each batch
> run load all the already processed entries which seems not to be a
> good idea ;-)
> 
> I also think that issuing 60k queries to always fetch the relevant
> related entry from the past for each new entry is a good idea. I could
> try to "window" it tho and group them by let's say 100 entries and
> fire a query to fetch the 100 old entries for the current 100
> processed entries... that would at least reduce the amount of queries
> by 60k/100.
> 
> Are there any other good ways to solve issues like that? I would
> imagine that those situations should be quite common. Maybe there are
> some best practices around this issue.
> 
> It's basically enriching already processed entries with information
> from new entries.
> 
> Would be great if someone could point me in the right direction or
> give me some more keywords that I can google.
> 
> Thanks and regards
> Jo



Re: Support null values in kafkaIO

2022-03-28 Thread Alexey Romanenko
Thank you for working on this, John! This case with null key/values seems quite 
demanded.  

—
Alexey

> On 28 Mar 2022, at 21:58, John Casey  wrote:
> 
> Unfortunately, there isn't a workaround at the moment. I'm nearing completion 
> of the fix.
> 
> https://issues.apache.org/jira/projects/BEAM/issues/BEAM-10529 
> 
> 
> John
> 
> On Mon, Mar 28, 2022 at 12:21 PM Brian Hulette  > wrote:
> Hi Abdelhakim,
> 
> +John Casey  is working on a fix [1] for 
> BEAM-10529 now. I'm not aware of a workaround but maybe John knows of one.
> 
> Brian
> 
> [1] https://github.com/apache/beam/pull/16923 
> 
> On Mon, Mar 28, 2022 at 6:50 AM Abdelhakim Bendjabeur 
>  > wrote:
> Hello,
> 
> I am trying to build a pipeline using Beam's Python SDK to run on Dataflow 
> and I encountered an error when encoding Null value message coming from kafka 
> (tombstone message)
> 
> ```
> Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null 
> byte[]
> ```
> 
> It seems unsupported for the moment, as I saw here 
> 
> 
> Is there a workaround for this? 
> To avoid having errors pop up each time a null-value message arrives?
> Or to bypass these events?
> 
> jira ticket that might be related here 
> 
> slack message here 
> 
> 
> Kind regards,
> Abdelhakim Bendjabeur
> Data Engineer @gorgias <>


Re: [Question] Spark: standard setup to use beam-spark to parallelize python code

2022-03-28 Thread Alexey Romanenko

> On 28 Mar 2022, at 20:58, Mihai Alexe  wrote:
> 
> the jackson runtime dependencies should be updated manually (at least to 
> 2.9.2) in case of using Spark 2.x
>  
> yes - that is exactly what we are looking to achieve, any hints about how to 
> do that? We’re not Java experts. Do you happen to have a CI recipe or binary 
> lis for this particular configuration? Thank you!

For our testing pipelines, that run on Spark 2.4.7, we just build them with a 
recent version of jackson libs [1]. Though, iiuc, you don’t build any java 
code. So, what actually is an issue that you are facing? 

> use Spark 3..x if possible since it already provides jackson jars of version 
> 2.10.0.
>  
> we tried this too but ran into other compatibility problems. Seems that the 
> Beam Spark runner (in v 2.37.0) only supports the Spark 2.x branch, as per 
> the Beam docs https://beam.apache.org/documentation/runners/spark/ 
> <https://beam.apache.org/documentation/runners/spark/>
Which exactly Spark 3.x version you did try? AFAICT, Beam 2.37.0 supports and 
was tested with Spark 3.1.2 / Scala 2.12 artifacts.

—
Alexey

[1] 
https://github.com/Talend/beam-samples/blob/9288606495b9ba8f77383cd9709ed9b5783deeb8/pom.xml#L66

>  
> any ideas?
>  
> On 2022/03/28 17:38:13 Alexey Romanenko wrote:
> > Well, it’s caused by recent jackson's version update in Beam [1] - so, the 
> > jackson runtime dependencies should be updated manually (at least to 2.9.2) 
> > in case of using Spark 2.x.
> > 
> > Either, use Spark 3..x if possible since it already provides jackson jars 
> > of version 2.10.0.
> >  
> > [1] 
> > https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c
> >  
> > <https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c><https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c>
> >  
> > <https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c%3e>
> > 
> > —
> > Alexey
> > 
> > > On 28 Mar 2022, at 14:15, Florian Pinault  > > <mailto:fl...@ecmwf.int>> wrote:
> > > 
> > > Greetings,
> > >  
> > > We are setting up an Apache Beam cluster using Spark as a backend to run 
> > > python code. This is currently a toy example with 4 virtual machines 
> > > running Centos (a client, a spark main, and two spark-workers).
> > > We are running into version issues (detail below) and would need help on 
> > > which versions to set up.
> > > We currently are trying spark-2.4.8-bin-hadoop2.7, with the pip package 
> > > beam 2.37.0 on the client, and using a job-server to create docker image.
> > >  
> > > I saw here https://beam.apache.org/blog/beam-2.33.0/ 
> > > <https://beam.apache.org/blog/beam-2.33.0/> 
> > > <https://beam.apache.org/blog/beam-2.33.0/> 
> > > <https://beam.apache.org/blog/beam-2.33.0/%3e> that "Spark 2.x users will 
> > > need to update Spark's Jackson runtime dependencies 
> > > (spark.jackson.version) to at least version 2.9.2, due to Beam updating 
> > > its dependencies." 
> > >  But it looks like the jackson-core version in the job-server is 2.13.0 
> > > whereas the jars in spark-2.4.8-bin-hadoop2.7/jars are
> > > -. 1 mluser mluser 46986 May 8 2021 jackson-annotations-2.6.7.jar
> > > -. 1 mluser mluser 258919 May 8 2021 jackson-core-2.6.7.jar
> > > -. 1 mluser mluser 232248 May 8 2021 jackson-core-asl-1.9.13.jar
> > > -. 1 mluser mluser 1166637 May 8 2021 jackson-databind-2.6.7.3.jar
> > > -. 1 mluser mluser 320444 May 8 2021 jackson-dataformat-yaml-2.6.7.jar
> > > -. 1 mluser mluser 18336 May 8 2021 jackson-jaxrs-1.9.13.jar
> > > -. 1 mluser mluser 780664 May 8 2021 jackson-mapper-asl-1.9.13.jar
> > > -. 1 mluser mluser 32612 May 8 2021 
> > > jackson-module-jaxb-annotations-2.6.7.jar
> > > -. 1 mluser mluser 42858 May 8 2021 jackson-module-paranamer-2.7.9.jar
> > > -. 1 mluser mluser 515645 May 8 2021 jackson-module-scala_2.11-2.6.7.1.jar
> > >  
> > > There must be something to update, but I am not sure how to update these 
> > > jar files with their dependencies, and not sure if this would get us very 
> > > far.
> > >  
> > > Would you have a list of binaries that work together or some running CI 
> > > from the apache foundation similar to what we are trying to achieve?
> > 
> > 



Re: [Question] Spark: standard setup to use beam-spark to parallelize python code

2022-03-28 Thread Alexey Romanenko
Well, it’s caused by recent jackson's version update in Beam [1] - so, the 
jackson runtime dependencies should be updated manually (at least to 2.9.2) in 
case of using Spark 2.x. 

Either, use Spark 3..x if possible since it already provides jackson jars of 
version 2.10.0.
 
[1] 
https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c 


—
Alexey

> On 28 Mar 2022, at 14:15, Florian Pinault  wrote:
> 
> Greetings,
>  
> We are setting up an Apache Beam cluster using Spark as a backend to run 
> python code. This is currently a toy example with 4 virtual machines running 
> Centos (a client, a spark main, and two spark-workers). 
> We are running into version issues (detail below) and would need help on 
> which versions to set up.
> We currently are trying spark-2.4.8-bin-hadoop2.7, with the pip package beam 
> 2.37.0 on the client, and using a job-server to create docker image.
>  
> I saw here https://beam.apache.org/blog/beam-2.33.0/ 
>  that "Spark 2.x users will need 
> to update Spark's Jackson runtime dependencies (spark.jackson.version) to at 
> least version 2.9.2, due to Beam updating its dependencies." 
>  But it looks like the jackson-core version in the job-server is 2.13.0 
> whereas the jars in spark-2.4.8-bin-hadoop2.7/jars are
> -. 1 mluser mluser 46986 May 8 2021 jackson-annotations-2.6.7.jar
> -. 1 mluser mluser 258919 May 8 2021 jackson-core-2.6.7.jar
> -. 1 mluser mluser 232248 May 8 2021 jackson-core-asl-1.9.13.jar
> -. 1 mluser mluser 1166637 May 8 2021 jackson-databind-2.6.7.3.jar
> -. 1 mluser mluser 320444 May 8 2021 jackson-dataformat-yaml-2.6.7.jar
> -. 1 mluser mluser 18336 May 8 2021 jackson-jaxrs-1.9.13.jar
> -. 1 mluser mluser 780664 May 8 2021 jackson-mapper-asl-1.9.13.jar
> -. 1 mluser mluser 32612 May 8 2021 jackson-module-jaxb-annotations-2.6.7.jar
> -. 1 mluser mluser 42858 May 8 2021 jackson-module-paranamer-2.7.9.jar
> -. 1 mluser mluser 515645 May 8 2021 jackson-module-scala_2.11-2.6.7.1.jar
>  
> There must be something to update, but I am not sure how to update these jar 
> files with their dependencies, and not sure if this would get us very far.
>  
> Would you have a list of binaries that work together or some running CI from 
> the apache foundation similar to what we are trying to achieve?



Re: Write S3 File with CannedACL

2022-03-10 Thread Alexey Romanenko
The contributions are very welcome! So, if you decide to go forward with this, 
please, take a look on these guides [1][2]. 
In short, I may assign this jira [3] to you and once you have a fix then just 
submit a PR and ask for review by pinging me or Moritz (cc). 

Please, don’t hesitate to ask any questions if you have.

—
Alexey

[1] https://beam.apache.org/contribute/ <https://beam.apache.org/contribute/>
[2] https://cwiki.apache.org/confluence/display/BEAM/Developer+Guides 
<https://cwiki.apache.org/confluence/display/BEAM/Developer+Guides>
[3] https://issues.apache.org/jira/browse/BEAM-10850 
<https://issues.apache.org/jira/browse/BEAM-10850>

> On 9 Mar 2022, at 19:33, Yushu Yao  wrote:
> 
> As long as we can use AvroIO to save files to "s3://xx" we are fine. 
> Looks like the JIRA has been around for a while. What is the procedure to 
> contribute from our end? 
> 
> Thanks 
> 
> On Wed, Mar 9, 2022 at 9:59 AM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Hi Yushu,
> 
> I’m not sure that we have a workaround for that since the related jira issue 
> [1] is still open.
> 
> Side question: are you interested only in multipart version or both?
> 
> —
> Alexey
> 
> [1] https://issues.apache.org/jira/browse/BEAM-10850 
> <https://issues.apache.org/jira/browse/BEAM-10850>
> 
>> On 9 Mar 2022, at 00:19, Yushu Yao > <mailto:yao.yu...@gmail.com>> wrote:
>> 
>> Hi Team, 
>> We have a use case that needs to add `--acl bucket-owner-full-control` 
>> whenever we want to upload a file to S3. So if we want to use aws cli, it 
>> will be:
>> 
>> aws s3 cp myfile s3://bucket/path/file <> --acl bucket-owner-full-control
>> 
>> So to do it in java code, we use (assuming aws s3 sdk v1):
>> 
>> InitiateMultipartUploadRequest.withCannedACL(CannedAccessControlList.BucketOwnerFullControl)
>> 
>> In the beam code below it seems cannedACL is not supported: 
>> https://github.com/apache/beam/blob/95a5c26c4c6d0a1c4155ad209b61e623781c47df/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java#L485
>>  
>> <https://github.com/apache/beam/blob/95a5c26c4c6d0a1c4155ad209b61e623781c47df/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java#L485>
>> 
>> It's similar in the s3 sdk v2 beam code as well. 
>> 
>> Wondering how do I work around this? 
>> 
>> Thanks a lot!
>> -Yushu
>> 
> 



Re: Write S3 File with CannedACL

2022-03-09 Thread Alexey Romanenko
Hi Yushu,

I’m not sure that we have a workaround for that since the related jira issue 
[1] is still open.

Side question: are you interested only in multipart version or both?

—
Alexey

[1] https://issues.apache.org/jira/browse/BEAM-10850

> On 9 Mar 2022, at 00:19, Yushu Yao  wrote:
> 
> Hi Team, 
> We have a use case that needs to add `--acl bucket-owner-full-control` 
> whenever we want to upload a file to S3. So if we want to use aws cli, it 
> will be:
> 
> aws s3 cp myfile s3://bucket/path/file --acl bucket-owner-full-control
> 
> So to do it in java code, we use (assuming aws s3 sdk v1):
> 
> InitiateMultipartUploadRequest.withCannedACL(CannedAccessControlList.BucketOwnerFullControl)
> 
> In the beam code below it seems cannedACL is not supported: 
> https://github.com/apache/beam/blob/95a5c26c4c6d0a1c4155ad209b61e623781c47df/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java#L485
>  
> 
> 
> It's similar in the s3 sdk v2 beam code as well. 
> 
> Wondering how do I work around this? 
> 
> Thanks a lot!
> -Yushu
> 



Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-02 Thread Alexey Romanenko
ofiler.scala:110)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153)
> at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122)
> at 
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.scheduler.Task.run(Task.scala:93)
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:824)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1621)
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:827)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at 
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:683)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> 
> On Tue, Feb 1, 2022 at 12:07 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Well, personally I didn’t test with this version, but it should be fine… 
> Can you enable debug logs to check what’s happening there? 
> Can you make sure that there is no issue with firewall or something? 
> Can you run this pipeline locally against a real Kafka server, not Azure 
> Event Hub, to make sure that it works fine?
> Otherwise, it would need to debug remotely the worker process.
> 
>> On 1 Feb 2022, at 19:18, Utkarsh Parekh > <mailto:utkarsh.s.par...@gmail.com>> wrote:
>> 
>> Sorry I sent the last message in a hurry. Here is the Beam java to kafka: Is 
>> something missing here?
>> 
>> 
>> org.apache.beam
>> beam-sdks-java-io-kafka
>> 2.35.0
>> 
>> 
>> On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh > <mailto:utkarsh.s.par...@gmail.com>> wrote:
>> Here it is 
>> 
>> 
>> org.apache.kafka
>> kafka-clients
>> 2.8.0
>> 
>> 
>> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko > <mailto:aromanenko@gmail.com>> wrote:
>> Hmm, this is strange. Which version of Kafka client do you use while running 
>> it with Beam?
>> 
>>> On 1 Feb 2022, at 16:56, Utkarsh Parekh >> <mailto:utkarsh.s.par...@gmail.com>> wrote:
>>> 
>>> Hi Alexey, 
>>> 
>>> First of all, thank you for the response! Yes I did have it in Consumer 
>>> configuration and try to increase "session.timeout".
>>> 
>>> From consumer side so far I've following settings:
>>> props.put("sasl.mechanism", SASL_MECHANISM);
>>> props.put("security.protocol", SECURITY_PROTOCOL);
>>> props.put("sasl.jaas.config", saslJaasConfig);
>>> props.put("request.timeout.ms <http://request.timeout.ms/>", 6);
>>> props.put("session.timeout.ms <http://session.timeout.ms/>", 6);
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
>>> AUTO_OFFSET_RESET_CONFIG);
>>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>>> 
>>> It works fine using following code in Databricks Notebook. The problem has 
>>> been occurring when I run it through Apache beam and KafkaIO (Just 
>>> providing more context if that may help you to understand problem)
>>> 
>>> val df = spark.readStream
>>> .format("kafka")
>>> .option("subscribe", TOPIC)
>>> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>>> .option("kafka.sasl.mechanism", "PLAIN")
>>> .option("kafka.security.protocol", "SASL_SSL")
>>> .option("kafka.sasl.jaas.config", EH_SASL)
>>> .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", 
>>> "6")
>>> .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", 
>>> "6")
>>> .option("failOnDataLoss", "false")
>>> //.option("kafka.group.id <http://kafka.group.id/>", "testsink")
>>> .option("startingOffsets", "latest")
>>> .load()
>>> 
>>> Utkarsh
>>> 
>>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko >> <mailto:aromanenko@gmail.com>> wrote:
>>> Hi Utkarsh,
>>> 
>>> Can it be related to this configuration problem?
>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>>  
>>> <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>
>>> 
>>> Did you check timeout settings?
>>> 
>>> —
>>> Alexey  
>>> 
>>> 
>>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh >>> <mailto:utkarsh.s.par...@gmail.com>> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm 
>>>> trying to create a simple streaming app with Apache Beam, where it reads 
>>>> data from an Azure event hub and produces messages into another Azure 
>>>> event hub. 
>>>> 
>>>> I'm creating and running spark jobs on Azure Databricks.
>>>> 
>>>> The problem is the consumer (uses SparkRunner) is not able to read data 
>>>> from Event hub (queue). There is no activity and no errors on the Spark 
>>>> cluster.
>>>> 
>>>> I would appreciate it if anyone could help to fix this issue.
>>>> 
>>>> Thank you
>>>> 
>>>> Utkarsh
>>> 
>> 
> 



Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Alexey Romanenko
Well, personally I didn’t test with this version, but it should be fine… 
Can you enable debug logs to check what’s happening there? 
Can you make sure that there is no issue with firewall or something? 
Can you run this pipeline locally against a real Kafka server, not Azure Event 
Hub, to make sure that it works fine?
Otherwise, it would need to debug remotely the worker process.

> On 1 Feb 2022, at 19:18, Utkarsh Parekh  wrote:
> 
> Sorry I sent the last message in a hurry. Here is the Beam java to kafka: Is 
> something missing here?
> 
> 
> org.apache.beam
> beam-sdks-java-io-kafka
> 2.35.0
> 
> 
> On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh  <mailto:utkarsh.s.par...@gmail.com>> wrote:
> Here it is 
> 
> 
> org.apache.kafka
> kafka-clients
>     2.8.0
> 
> 
> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Hmm, this is strange. Which version of Kafka client do you use while running 
> it with Beam?
> 
>> On 1 Feb 2022, at 16:56, Utkarsh Parekh > <mailto:utkarsh.s.par...@gmail.com>> wrote:
>> 
>> Hi Alexey, 
>> 
>> First of all, thank you for the response! Yes I did have it in Consumer 
>> configuration and try to increase "session.timeout".
>> 
>> From consumer side so far I've following settings:
>> props.put("sasl.mechanism", SASL_MECHANISM);
>> props.put("security.protocol", SECURITY_PROTOCOL);
>> props.put("sasl.jaas.config", saslJaasConfig);
>> props.put("request.timeout.ms <http://request.timeout.ms/>", 6);
>> props.put("session.timeout.ms <http://session.timeout.ms/>", 6);
>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>> 
>> It works fine using following code in Databricks Notebook. The problem has 
>> been occurring when I run it through Apache beam and KafkaIO (Just providing 
>> more context if that may help you to understand problem)
>> 
>> val df = spark.readStream
>> .format("kafka")
>> .option("subscribe", TOPIC)
>> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>> .option("kafka.sasl.mechanism", "PLAIN")
>> .option("kafka.security.protocol", "SASL_SSL")
>> .option("kafka.sasl.jaas.config", EH_SASL)
>> .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", 
>> "6")
>> .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", 
>> "6")
>> .option("failOnDataLoss", "false")
>> //.option("kafka.group.id <http://kafka.group.id/>", "testsink")
>> .option("startingOffsets", "latest")
>> .load()
>> 
>> Utkarsh
>> 
>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko > <mailto:aromanenko@gmail.com>> wrote:
>> Hi Utkarsh,
>> 
>> Can it be related to this configuration problem?
>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>  
>> <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>
>> 
>> Did you check timeout settings?
>> 
>> —
>> Alexey   
>> 
>> 
>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh >> <mailto:utkarsh.s.par...@gmail.com>> wrote:
>>> 
>>> Hello,
>>> 
>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying 
>>> to create a simple streaming app with Apache Beam, where it reads data from 
>>> an Azure event hub and produces messages into another Azure event hub. 
>>> 
>>> I'm creating and running spark jobs on Azure Databricks.
>>> 
>>> The problem is the consumer (uses SparkRunner) is not able to read data 
>>> from Event hub (queue). There is no activity and no errors on the Spark 
>>> cluster.
>>> 
>>> I would appreciate it if anyone could help to fix this issue.
>>> 
>>> Thank you
>>> 
>>> Utkarsh
>> 
> 



Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Alexey Romanenko
Hmm, this is strange. Which version of Kafka client do you use while running it 
with Beam?

> On 1 Feb 2022, at 16:56, Utkarsh Parekh  wrote:
> 
> Hi Alexey, 
> 
> First of all, thank you for the response! Yes I did have it in Consumer 
> configuration and try to increase "session.timeout".
> 
> From consumer side so far I've following settings:
> props.put("sasl.mechanism", SASL_MECHANISM);
> props.put("security.protocol", SECURITY_PROTOCOL);
> props.put("sasl.jaas.config", saslJaasConfig);
> props.put("request.timeout.ms <http://request.timeout.ms/>", 6);
> props.put("session.timeout.ms <http://session.timeout.ms/>", 6);
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
> 
> It works fine using following code in Databricks Notebook. The problem has 
> been occurring when I run it through Apache beam and KafkaIO (Just providing 
> more context if that may help you to understand problem)
> 
> val df = spark.readStream
> .format("kafka")
> .option("subscribe", TOPIC)
> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
> .option("kafka.sasl.mechanism", "PLAIN")
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.jaas.config", EH_SASL)
> .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", 
> "6")
> .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", 
> "6")
> .option("failOnDataLoss", "false")
> //.option("kafka.group.id <http://kafka.group.id/>", "testsink")
> .option("startingOffsets", "latest")
> .load()
> 
> Utkarsh
> 
> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Hi Utkarsh,
> 
> Can it be related to this configuration problem?
> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>  
> <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>
> 
> Did you check timeout settings?
> 
> —
> Alexey
> 
> 
>> On 1 Feb 2022, at 02:27, Utkarsh Parekh > <mailto:utkarsh.s.par...@gmail.com>> wrote:
>> 
>> Hello,
>> 
>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying 
>> to create a simple streaming app with Apache Beam, where it reads data from 
>> an Azure event hub and produces messages into another Azure event hub. 
>> 
>> I'm creating and running spark jobs on Azure Databricks.
>> 
>> The problem is the consumer (uses SparkRunner) is not able to read data from 
>> Event hub (queue). There is no activity and no errors on the Spark cluster.
>> 
>> I would appreciate it if anyone could help to fix this issue.
>> 
>> Thank you
>> 
>> Utkarsh
> 



Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Alexey Romanenko
Hi Utkarsh,

Can it be related to this configuration problem?
https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
 


Did you check timeout settings?

—
Alexey  


> On 1 Feb 2022, at 02:27, Utkarsh Parekh  wrote:
> 
> Hello,
> 
> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying 
> to create a simple streaming app with Apache Beam, where it reads data from 
> an Azure event hub and produces messages into another Azure event hub. 
> 
> I'm creating and running spark jobs on Azure Databricks.
> 
> The problem is the consumer (uses SparkRunner) is not able to read data from 
> Event hub (queue). There is no activity and no errors on the Spark cluster.
> 
> I would appreciate it if anyone could help to fix this issue.
> 
> Thank you
> 
> Utkarsh



Re: [Question] Can Apache beam used for chaining IO transforms

2022-01-27 Thread Alexey Romanenko
Hi Wei,

Thanks for details! Yes, defentively let’s chat about this in the related Jira 
issue.

—
Alexey

> On 27 Jan 2022, at 00:13, Wei Hsia ☁  wrote:
> 
> Hi Alexey, 
> 
> Thanks Cham! 
> 
> Sorry, i've been working on this in a vacuum, my apologies. 
> 
> My approach was slightly different, rather than write().withResults() - we 
> were taking the approach of another (which is not as ideal but the least 
> intrusive) creating another class WriteRecordsWithOutput and having 
> WriteRecords delegate to the new class. 
> It's the jira issue you attached to yours (I didn't really follow protocols 
> as I was just starting to venture into this). 
> Partially it's because having the ProducerRecord would be beneficial for the 
> use case and changing the signature on the public class WriteRecords can't be 
> an overnight change. 
> 
> Happy to chat about it - I'd guess I'm halfway done but if you've got 
> something I'd love to chat about it. 
> 
> Thanks,
> 
> Wei
> 
> 
> 
> 
> Wei Hsia
> Customer Engineer, Analytics Specialist
> Google Cloud
> weih...@google.com <mailto:weih...@google.com>
> 949.794.2004
> 
> 
> On Wed, Jan 26, 2022 at 2:22 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Thanks Cham but actually we already had a similar open issue for this [1] and 
> I’m currently working on that one. Also, I created an umbrella Jira for such 
> tasks for all other Java SDK IOs since I believe this behaviour can be quite 
> demanded in general [2] 
> 
> I’d be happy to discuss it in more details.
> 
> —
> Alexey
> 
> [1] https://issues.apache.org/jira/browse/BEAM-13298 
> <https://issues.apache.org/jira/browse/BEAM-13298>
> [2] https://issues.apache.org/jira/browse/BEAM-13584 
> <https://issues.apache.org/jira/browse/BEAM-13584>
> 
> 
> 
> 
>> On 26 Jan 2022, at 19:38, Chamikara Jayalath > <mailto:chamik...@google.com>> wrote:
>> 
>> Yeah, I think Wait transform will not work here as well. We have to replace 
>> Kafka write transform's PDone output with a proper write result. I think 
>> +Wei Hsia <mailto:weih...@google.com> has been looking into doing something 
>> like this. Created https://issues.apache.org/jira/browse/BEAM-13748 
>> <https://issues.apache.org/jira/browse/BEAM-13748> for tracking.
>> 
>> Thanks,
>> Cham
>> 
>> On Wed, Jan 26, 2022 at 10:32 AM Yomal de Silva > <mailto:yomal.prav...@gmail.com>> wrote:
>> Hi Alexey,
>> Thank you for your reply. Yes, that's the use case here. 
>> 
>> Tried the approach that you have suggested, but for the window to get 
>> triggered shouldn't we apply some transform like GroupByKey? or else the 
>> window is ignored right? As we dont have such a transform applied, we did 
>> not observe any windowing behavior in the pipeline. 
>> 
>> On Wed, Jan 26, 2022 at 10:04 PM Alexey Romanenko > <mailto:aromanenko@gmail.com>> wrote:
>> Hi Yomal de Silva,
>> 
>> IIUC, you need to pass downstream the records only once they were stored in 
>> DB with JdbcIO, don’t you? And your pipeline is unbounded. 
>> 
>> So, why not to split into windows your input data PCollection (if it was not 
>> done upstream) and use Wait.on() with JdbcIO.write().withResults()? This is 
>> exactly a case for which it was initially developed.
>> 
>> —
>> Alexey
>> 
>> 
>> > On 26 Jan 2022, at 10:16, Yomal de Silva > > <mailto:yomal.prav...@gmail.com>> wrote:
>> > 
>> > Hi beam team,
>> > I have been working on a data pipeline which consumes data from an SFTP 
>> > location and the data is passed through multiple transforms and finally 
>> > published to a kafka topic. During this we need to maintain the state of 
>> > the file, so we write to the database during the pipeline. 
>> > We came across that JDBCIo write is only considered as a sink and it 
>> > returns a PDone. But for our requirement we need to pass down the elements 
>> > that were persisted.
>> > 
>> > Using writeVoid is also not useful since this is an unbounded stream and 
>> > we cannot use Wait.On with that without using windows and triggers. 
>> > 
>> > Read file --> write the file state in db --> enrich data --> publish to 
>> > kafka --> update file state
>> > 
>> > The above is the basic requirement from the pipeline. Any suggestions?
>> > 
>> > Thank you
>> 
> 



Re: [Question] Can Apache beam used for chaining IO transforms

2022-01-26 Thread Alexey Romanenko
Hmm, do you mean that if you have a windowed PCollection as an input for 
"JdbcIO.write().withResults()" (“signal" transform) and then you apply 
"Wait.on(signal)" then it doesn’t work as expected [1]?

[1] 
https://github.com/apache/beam/blob/3c83a3fa0cf68e69d484d048ac8e96de37fc12d5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java#L58
  

> On 26 Jan 2022, at 19:31, Yomal de Silva  wrote:
> 
> Hi Alexey,
> Thank you for your reply. Yes, that's the use case here. 
> 
> Tried the approach that you have suggested, but for the window to get 
> triggered shouldn't we apply some transform like GroupByKey? or else the 
> window is ignored right? As we dont have such a transform applied, we did not 
> observe any windowing behavior in the pipeline. 
> 
> On Wed, Jan 26, 2022 at 10:04 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Hi Yomal de Silva,
> 
> IIUC, you need to pass downstream the records only once they were stored in 
> DB with JdbcIO, don’t you? And your pipeline is unbounded. 
> 
> So, why not to split into windows your input data PCollection (if it was not 
> done upstream) and use Wait.on() with JdbcIO.write().withResults()? This is 
> exactly a case for which it was initially developed.
> 
> —
> Alexey
> 
> 
> > On 26 Jan 2022, at 10:16, Yomal de Silva  > <mailto:yomal.prav...@gmail.com>> wrote:
> > 
> > Hi beam team,
> > I have been working on a data pipeline which consumes data from an SFTP 
> > location and the data is passed through multiple transforms and finally 
> > published to a kafka topic. During this we need to maintain the state of 
> > the file, so we write to the database during the pipeline. 
> > We came across that JDBCIo write is only considered as a sink and it 
> > returns a PDone. But for our requirement we need to pass down the elements 
> > that were persisted.
> > 
> > Using writeVoid is also not useful since this is an unbounded stream and we 
> > cannot use Wait.On with that without using windows and triggers. 
> > 
> > Read file --> write the file state in db --> enrich data --> publish to 
> > kafka --> update file state
> > 
> > The above is the basic requirement from the pipeline. Any suggestions?
> > 
> > Thank you
> 



Re: [Question] Can Apache beam used for chaining IO transforms

2022-01-26 Thread Alexey Romanenko
Thanks Cham but actually we already had a similar open issue for this [1] and 
I’m currently working on that one. Also, I created an umbrella Jira for such 
tasks for all other Java SDK IOs since I believe this behaviour can be quite 
demanded in general [2] 

I’d be happy to discuss it in more details.

—
Alexey

[1] https://issues.apache.org/jira/browse/BEAM-13298 
<https://issues.apache.org/jira/browse/BEAM-13298>
[2] https://issues.apache.org/jira/browse/BEAM-13584 
<https://issues.apache.org/jira/browse/BEAM-13584>




> On 26 Jan 2022, at 19:38, Chamikara Jayalath  wrote:
> 
> Yeah, I think Wait transform will not work here as well. We have to replace 
> Kafka write transform's PDone output with a proper write result. I think +Wei 
> Hsia <mailto:weih...@google.com> has been looking into doing something like 
> this. Created https://issues.apache.org/jira/browse/BEAM-13748 
> <https://issues.apache.org/jira/browse/BEAM-13748> for tracking.
> 
> Thanks,
> Cham
> 
> On Wed, Jan 26, 2022 at 10:32 AM Yomal de Silva  <mailto:yomal.prav...@gmail.com>> wrote:
> Hi Alexey,
> Thank you for your reply. Yes, that's the use case here. 
> 
> Tried the approach that you have suggested, but for the window to get 
> triggered shouldn't we apply some transform like GroupByKey? or else the 
> window is ignored right? As we dont have such a transform applied, we did not 
> observe any windowing behavior in the pipeline. 
> 
> On Wed, Jan 26, 2022 at 10:04 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Hi Yomal de Silva,
> 
> IIUC, you need to pass downstream the records only once they were stored in 
> DB with JdbcIO, don’t you? And your pipeline is unbounded. 
> 
> So, why not to split into windows your input data PCollection (if it was not 
> done upstream) and use Wait.on() with JdbcIO.write().withResults()? This is 
> exactly a case for which it was initially developed.
> 
> —
> Alexey
> 
> 
> > On 26 Jan 2022, at 10:16, Yomal de Silva  > <mailto:yomal.prav...@gmail.com>> wrote:
> > 
> > Hi beam team,
> > I have been working on a data pipeline which consumes data from an SFTP 
> > location and the data is passed through multiple transforms and finally 
> > published to a kafka topic. During this we need to maintain the state of 
> > the file, so we write to the database during the pipeline. 
> > We came across that JDBCIo write is only considered as a sink and it 
> > returns a PDone. But for our requirement we need to pass down the elements 
> > that were persisted.
> > 
> > Using writeVoid is also not useful since this is an unbounded stream and we 
> > cannot use Wait.On with that without using windows and triggers. 
> > 
> > Read file --> write the file state in db --> enrich data --> publish to 
> > kafka --> update file state
> > 
> > The above is the basic requirement from the pipeline. Any suggestions?
> > 
> > Thank you
> 



Re: [Question] Can Apache beam used for chaining IO transforms

2022-01-26 Thread Alexey Romanenko
Hi Yomal de Silva,

IIUC, you need to pass downstream the records only once they were stored in DB 
with JdbcIO, don’t you? And your pipeline is unbounded. 

So, why not to split into windows your input data PCollection (if it was not 
done upstream) and use Wait.on() with JdbcIO.write().withResults()? This is 
exactly a case for which it was initially developed.

—
Alexey
 

> On 26 Jan 2022, at 10:16, Yomal de Silva  wrote:
> 
> Hi beam team,
> I have been working on a data pipeline which consumes data from an SFTP 
> location and the data is passed through multiple transforms and finally 
> published to a kafka topic. During this we need to maintain the state of the 
> file, so we write to the database during the pipeline. 
> We came across that JDBCIo write is only considered as a sink and it returns 
> a PDone. But for our requirement we need to pass down the elements that were 
> persisted.
> 
> Using writeVoid is also not useful since this is an unbounded stream and we 
> cannot use Wait.On with that without using windows and triggers. 
> 
> Read file --> write the file state in db --> enrich data --> publish to kafka 
> --> update file state
> 
> The above is the basic requirement from the pipeline. Any suggestions?
> 
> Thank you



Re: [RFC][design] Standardizing Beam IO connectors

2022-01-25 Thread Alexey Romanenko
Thanks Pablo and others for creating such doc, great initiative! I left my 
comments.

After all corrections and discussions, I suppose, it should be added to Beam 
Contribution Guide.

—
Alexey

> On 24 Jan 2022, at 17:00, Pablo Estrada  wrote:
> 
> Hi all!
> 
> A few of us have started putting together a proposal to try and standardize 
> IO connectors for Beam. What does this mean?
> 
> Most Beam connectors have been created by separate individuals, and they've 
> evolved over time with different features and API choices. This is fine, but 
> it makes Beam more difficult to learn as a framework, because for every new 
> IO connector, a user needs to study it and learn before picking it up.
> 
> The goal for this doc is to simplify the development, review and usage of IO 
> connectors for Beam. Would you please take a look and add your comments / 
> opinions / ideas? We'd love everyone's feedback (dev@ and user@ alike!) so 
> that we can, in the future, strive for connectors that are easier to 
> understand and develop.
> 
> PTAL: https://s.apache.org/beam-io-api-standard 
> 
> 
> Best
> -P.



Re: Testing a jvm pipeline on the portability framework locally

2022-01-18 Thread Alexey Romanenko
Yes, you can do it locally, for example, by running your pipeline via portable 
Spark or Flink runner. See the details here [1] (select "Portable 
(Java/Python/Go)” tab for proper code/examples) and here [2].

And the moment of self-advertisement - I gave a talk about cross-language 
pipeline with a portable runner [3] a while ago, so it may be helpful as well.

[1] https://beam.apache.org/documentation/runners/spark/
[2] https://beam.apache.org/documentation/runners/flink/
[3] https://2020.beamsummit.org/sessions/cross-language-pipeline-python-java/

—
Alexey


> On 18 Jan 2022, at 15:14, Steve Niemitz  wrote:
> 
> If I have a (jvm) pipeline, is there a simple way (ie DirectRunner) to run it 
> locally but using the portability framework?  I'm running into a lot of weird 
> bugs running it on dataflow (v2) and want to be able to run it locally for a 
> faster debug loop.



Re: Calcite/BeamSql

2022-01-11 Thread Alexey Romanenko
If I understand your problem right, you can just use JdbcIO.readRows(), which 
returns a PCollection and can be used downstream to create a 
PCollectionTuple, which, in its turn, already contains another PCollection 
from your Kafka source. So, once you have a PCollectionTuple with two TupleTags 
(from Kafka and MySql), you can apply SqlTransform over it.

—
Alexey




> On 11 Jan 2022, at 03:54, Yushu Yao  wrote:
> 
> Thanks, Brian for the explanation. That helps a lot. 
> Now I'm clear on the Kafka source side. 
> 
> A follow-up on the other source that's in MySql. If I want to do the query:
> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key 
> 
> I can get the Kafka stream into a PCollection as you said above. 
> How about the MySql Table 1? Is there some semantic in Beam that allows me to 
> make the MySql table into a PCollection? (Or do I need to import it as a 
> PCollection? I think there is a Beam SQL Extension for it?) And does it need 
> to scan the full MySql Table1 to accomplish the above join? 
> 
> Thanks again!
> -Yushu
> 
> 
> On Mon, Jan 10, 2022 at 1:50 PM Brian Hulette  > wrote:
> Hi Yushu,
> Thanks for the questions! To process Kafka data with SqlTransform you have a 
> couple of options, you could just use KafkaIO and manually transforms the 
> records to produce a PCollection with a Schema [1], or you could use the DDL 
> to describe your kafka stream as a table [2], and query it directly with 
> SqlTransform. You can find examples of using the DDL with SqlTransform here 
> [3]. Note that the Kafka DDL supports "Generic Payload Handling", so you 
> should be able to configure it to consume JSON, proto, thrift, or avro 
> messages [4]. Would one of those work for you?
> 
> For your second question about "pushing down" the join on 2 tables: 
> unfortunately, that's not something we support right now. You'd have to do 
> that sort of optimization manually. This is something we've discussed in the 
> abstract but it's a ways off.
> 
> Brian
> 
> [1] https://beam.apache.org/documentation/programming-guide/#what-is-a-schema 
> 
> [2] 
> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#kafka
>  
> 
> [3] 
> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html
>  
> 
> [4] 
> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/#generic-payload-handling
>  
> 
> On Mon, Jan 10, 2022 at 12:15 PM Yushu Yao  > wrote:
> Hi Folks, 
> 
> Question from a Newbie for both Calcite and Beam:
> 
> I understand Calcite can make a tree of execution plan with relational 
> algebra and push certain operations to a "data source". And at the same time, 
> it can allow source-specific optimizations. 
> 
> I also understand that Beam SQL can run SqlTransform.query() on one or more 
> of the PCollection, and Calcite is used in coming up with the execution 
> plan. 
> 
> My question is, assume I have a MySql Table as Table1, and a Kafka Stream 
> called "Kafka". 
> 
> Now I want to do some joins like lookuping up a row based on a key in the 
> Kafka message: 
> select Table1.*, Kafka.* from Kafka join Table1 on Table1.key=Kafka.key 
> 
> What's the best way to implement this with beamSQL. (Note that we can't 
> hardcode the join because each input Kafka message may need a different SQL). 
> 
> One step further, if we have 2 MySql Tables, Table1, and Table2. And a Kafka 
> Stream "Kafka". And we want to join those 2 tables inside MySql first (and 
> maybe with aggregations like sum/count), then join with the Kafka. Is there a 
> way to tap into calcite so that the join of the 2 tables are actually pushed 
> into MySql? 
> 
> Sorry for the lengthy question and please let me know if more clarifications 
> is needed. 
> 
> Thanks a lot in advanced!
> 
> -Yushu
> 
> 
> 



Re: Reading from BigTable with Beam Python SDK

2022-01-07 Thread Alexey Romanenko
Yes but I think it’s caused by the same description for Java and Python SDKs 
and Java connector supports both - Read and Write.
So, we need to reflect this there, thanks for pointing out.

—
Alexey

> On 7 Jan 2022, at 14:50, Pierre Oberholzer  
> wrote:
> 
> Hi,
> 
> If this discussion implicitly says that there is currently no BigTable reader 
> in the Python SDK, it'd be great to reflect it in the doc [1] ;)
> 
> Thanks !
> 
> 
> 
> [1] https://beam.apache.org/documentation/io/built-in/ 
> 
> Le jeu. 6 janv. 2022 à 17:57, Luke Cwik  > a écrit :
> +1 on using cross language to get the Java Bigtable connector that already 
> exists.
> 
> You could also take a look at this other xlang documentation[1] and look at 
> an existing implementation such as kafka[2] that is xlang.
> 
> Finally there was support added to use many transforms in Java using the 
> class name and builder methods[3].
> 
> 1: https://beam.apache.org/documentation/patterns/cross-language/ 
>  
> 2: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py
>  
> 
> 3: https://issues.apache.org/jira/browse/BEAM-12769 
> 
> 
> 
> On Thu, Jan 6, 2022 at 4:41 AM Sayak Paul  > wrote:
> Hi folks,
> 
> My project needs reading data from Cloud BigTable. We are aware that an IO 
> connector for BigTable is available in the Java SDK. So we could probably 
> make use of the cross-language capabilities 
> 
>  of Beam and make it work. I am, however, looking for 
> guidance/resources/pointers that could be beneficial to build a Beam pipeline 
> in Python that reads data from Cloud BigTable. Any relevant clue would be 
> greatly appreciated. 
> 
> Sayak Paul | sayak.dev 
> 
> 
> 
> -- 
> Pierre



Re: Compatibility of Spark portable runner

2022-01-05 Thread Alexey Romanenko
Generally speaking, to avoid the potential issues the versions that are used in 
compile time and in runtime should be the same (most important is Scala 
versions) but, due to Spark backward compatibility, the minor versions can be 
different.  

> On 5 Jan 2022, at 07:50, Zheng Ni  wrote:
> 
> Hi Beam Community,
>  
> Greetings.
>  
> I am interested in submitting a spark job through portable runner. I have a 
> question about the compatibility between spark_job_server and spark cluster.
>  
> Let’s say I am going to use beam_spark_job_server of version 2.35.0. How 
> could I know which spark cluster version is compatible with it? Or could it 
> work with any versions of the spark cluster?
>  
>   <>
> Regards,
> Zheng 
> 



Re: Kafka manually commit offsets

2021-12-10 Thread Alexey Romanenko
I answered the similar questions on SO a while ago [1], and I hope it will help.

“By default, pipeline.apply(KafkaIO.read()...) will return a 
PCollection>. So, downstream in your pipeline you can get an 
offset from KafkaRecord metadata and commit it manually in a way that you need 
(just don't forget to disable AUTO_COMMIT in KafkaIO.read()).

By manual way, I mean that you should instantiate your own Kafka client in your 
DoFn, process input element (as KafkaRecord), that was read before, fetch 
an offset from KafkaRecord and commit it with your own client. 

Though, you need to make sure that a call to external API and offset commit 
will be atomic to prevent potential data loss (if it's critical)."

[1] 
https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
 


—
Alexey

> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz  
> wrote:
> 
> Thanks Luke for your quick response. I see, that makes sense. Now I have two 
> new questions if I may: 
> a) How I can get the offsets I want to commit. My investigation now is going 
> throw getCheckpointMark(), is this correct? 
> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>  
> 
> 
> b) With these offsets, I will create a client at the of the pipeline, with 
> Kafka library, and methods such as commitSync() and commitAsync(). Is this 
> correct? 
> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>  
> 
> 
> Thanks!!!
> 
> Juan 
> 
> 
> On Fri, 10 Dec 2021 at 01:07, Luke Cwik  > wrote:
> commitOffsetsInFinalize is about committing the offset after the output has 
> been durably persisted for the bundle containing the Kafka Read. The bundle 
> represents a unit of work over a subgraph of the pipeline. You will want to 
> ensure the commitOffsetsInFinalize is disabled and that the Kafka consumer 
> config doesn't auto commit automatically. This will ensure that KafkaIO.Read 
> doesn't commit the offsets. Then it is upto your PTransform to perform the 
> committing.
> 
> On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz 
> mailto:juancalvoferran...@gmail.com>> wrote:
> Morning!
> 
> First of all, thanks for all the incredible work you do, is amazing. Then, 
> secondly, I reach you for some help or guidance to manually commit records. I 
> want to do this so I can commit the record and the end of the pipeline, and 
> not in the read() of the KafkaIO.
> 
> Bearing in mind what I have read in this post: 
> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
>  
> 
>  , and thinking of a pipeline similar to the one described, I understand we 
> can use commitOffsetsInFinalize() to commit offsets in the read(). What I 
> don't understand is how this helps to commit the offset if we want to do this 
> at the end, not in the reading. Thanks. All comments and suggestions are 
> more than welcome. :) 
> 
> Juan Calvo Ferrándiz
> Data Engineer
> Go to LINKEDIN  
> Go to GITHUB 
> Go to MEDIUM  
> 



Re: [Question]

2021-10-28 Thread Alexey Romanenko
Hi,

You just need to create a shaded jar for SparkRunner and submit it with 
"spark-submit” and CLI options “--deploy-mode cluster --master yarn”.
Also, you need to specify “--runner=SparkRunner --sparkMaster=yarn” as pipeline 
options. 

—
Alexey

> On 26 Oct 2021, at 19:07, Holt Spalding  wrote:
> 
> Hello,
> 
> I've been having an incredibly difficult time running an apache beam pipeline 
> on a spark cluster in aws EMR. Spark on EMR is configured for yarn, which 
> appears to be the primary source of my issues, the documentation here: 
> https://beam.apache.org/documentation/runners/spark/ 
> 
> only seems to describe how to run beam on a cluster in Standalone mode. I've 
> tried different beam pipeline arguments, but it seems to run in local mode 
> every time after it can't find a spark master url. Has anyone run into this 
> issue, and or does anyone have suggestions or examples of how to get this to 
> work. Thank you for your help. 
> 



Re: Performance of Apache Beam

2021-10-19 Thread Alexey Romanenko
+ Azhar (just in case)

> On 18 Oct 2021, at 11:30, Jan Lukavský  wrote:
> 
> Hi Azhar,
> 
> -dev  +user 
> this kind of question cannot be answered in general. The overhead will depend 
> on the job and the SDK you use. Using Java SDK with (classical) FlinkRunner 
> should give the best performance on Flink, although the overhead will not be 
> completely nullified. The way Beam is constructed - with portability being 
> one of the main concerns - necessarily brings some overhead compared to the 
> job being written and optimized for single runner only (using Flink's native 
> API in this case). I'd suggest you evaluate the programming model and 
> portability guarantees, that Apache Beam gives you instead of pure 
> performance. On the other hand Apache Beam tries hard to minimize the 
> overhead, so you should not expect *vastly* worse performance. I'd say the 
> best way to go is to implement a simplistic Pipeline somewhat representing 
> your use-case and then measure the performance on this specific instance.
> 
> Regarding fault-tolerance and backpressure, Apache Beam model does not handle 
> those (with the exception of bundles being processed as atomic units), so 
> these are delegated to the runner - FlinkRunner will therefore behave the way 
> Apache Flink defines these concepts.
> 
> Hope this helps,
> 
>  Jan
> 
> On 10/17/21 17:53, azhar mirza wrote:
>> Hi Team
>> Could you please let me know following below answers .
>> 
>> I need to know performance of apache beam vs flink if we use flink as runner 
>> for Beam, what will be the additional overhead converting Beam to flink
>> 
>> How fault tolerance and resiliency handled in apache beam.
>> How apache beam handles backpressure?
>> 
>> Thanks
>> Azhar



Re: Perf issue with Beam on spark (spark runner)

2021-10-12 Thread Alexey Romanenko
Robert,

Do you have any numbers by chance regarding this optimisation?

Alexey

> On 5 Oct 2021, at 00:27, Robert Bradshaw  wrote:
> 
> https://github.com/apache/beam/pull/15637 
> <https://github.com/apache/beam/pull/15637> might help some.
> 
> On Thu, Sep 9, 2021 at 5:21 PM Tao Li  <mailto:t...@zillow.com>> wrote:
> Thanks Mike for this info!
> 
>  
> 
> From: Mike Kaplinskiy mailto:m...@ladderlife.com>>
> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Date: Tuesday, September 7, 2021 at 2:15 PM
> To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Cc: Alexey Romanenko  <mailto:aromanenko@gmail.com>>, Andrew Pilloud  <mailto:apill...@google.com>>, Ismaël Mejía  <mailto:ieme...@gmail.com>>, Kyle Weaver  <mailto:kcwea...@google.com>>, Yuchu Cao  <mailto:yuc...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> A long time ago when I was experimenting with the Spark runner for a batch 
> job, I noticed that a lot of time was spend in GC as well. In my case I 
> narrowed it down to how the Spark runner implements Coders. 
> 
>  
> 
> Spark's value prop is that it only serializes data when it truly has no other 
> choice - i.e. when it needs to reclaim memory or when it sends things over 
> the wire. Unfortunately due to the mismatch in serialization APIs between 
> Beam and Spark, Beam's Spark runner actually just serializes things all the 
> time. My theory was that the to/from byte array dance was slow. I attempted 
> to fix this at https://github.com/apache/beam/pull/8371 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fpull%2F8371&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187448677%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=ZtgDb0R3gjSHVU1rpp6T0ZVl7ZhXXRhH%2BqFMX8Z1z%2Bo%3D&reserved=0>
>  but I could never actually reproduce a speedup in performance benchmarks.
> 
>  
> 
> If you're feeling up to it, you could try reviving something like that PR and 
> see if it helps.
> 
>  
> 
> Mike.
> 
> Ladder 
> <https://nam11.safelinks.protection.outlook.com/?url=http%3A%2F%2Fbit.ly%2F1VRtWfS&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=RBjmeAAqHdrZmXZEP7ONXwXZyLOwwx6tQbST%2Bs6wq2Q%3D&reserved=0>.
>  The smart, modern way to insure your life.
> 
>  
> 
>  
> 
> On Sat, Aug 14, 2021 at 4:35 PM Tao Li  <mailto:t...@zillow.com>> wrote:
> 
> @Alexey Romanenko <mailto:aromanenko@gmail.com> I tried out ParquetIO 
> splittable and the processing time improved from 10 min to 6 min, but still 
> much longer than 2 min using a native spark app.
> 
>  
> 
> We are still seeing a lot of GC cost from below call stack. Do you think this 
> ticket can fix this issue https://issues.apache.org/jira/browse/BEAM-12646 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=Y4OpoFWLzBOf9Lfzg%2BBc%2ByTSsnIh%2FQVU4FSfrU93L%2F0%3D&reserved=0>
>  ? Thanks.
> 
>  
> 
> 
> 
>  
> 
>  
> 
>  
> 
> From: Tao Li mailto:t...@zillow.com>>
> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Date: Friday, August 6, 2021 at 11:12 AM
> To: Alexey Romanenko  <mailto:aromanenko@gmail.com>>
> Cc: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>, Andrew Pilloud 
> mailto:apill...@google.com>>, Ismaël Mejía 
> mailto:ieme...@gmail.com>>, Kyle Weaver 
> mailto:kcwea...@google.com>>, Yuchu Cao 
> mailto:yuc...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> Thanks @Alexey Romanenko <mailto:aromanenko@gmail.com> please see my 
> clarifications below.
> 
>  
> 
>  
> 
> | “Well, of course, if you read all fields (column

Re: Apache Beam 2.31 over Flink 1.13 with Java 11 throws RuntimeException

2021-09-28 Thread Alexey Romanenko
CC: dev@

> On 23 Sep 2021, at 15:48, Ohad Pinchevsky  wrote:
> According to the blog in Beam website Java 11 is already supported since 2.27 
> https://beam.apache.org/blog/beam-2.27.0/ 
> 

I believe it’s related only for Java SDK containers whereas Flink (or Spark) 
job server containers are still built with Java 8. 
I think we need fix it.


> But in the Dockerfile of Beam they have Java 8 installed:
> 
> https://hub.docker.com/layers/apache/beam_flink1.13_job_server/2.31.0/images/sha256-e727deacac9b16b5a05066998a8d39e0c2860ced0484246f16566bf384d8b028?context=explore
>  
> 
> 
> Flink 1.13 already support Java 11 
> https://github.com/apache/flink-docker/tree/master/1.13 
> 
> 
> Running mixed code resulting with exception:
> 
> compiled by a more recent version of the Java Runtime (class file version 
> 55.0), this version of the Java Runtime only recognizes class file versions 
> up to 52.0
> 



Re: KinesisIO - support for enhanced fan-out

2021-08-30 Thread Alexey Romanenko
There is already an open Jira for this feature [1]. Pavel, feel free to add 
your thoughts on this there or on dev@. 

I left my comment on related PR [2] and I’d be happy to help with moving this 
feature forward. It would be great to re-use the work that was already done.

—
Alexey

[1] https://issues.apache.org/jira/browse/BEAM-8511 

[2] https://github.com/apache/beam/pull/9899 



> On 28 Aug 2021, at 23:59, Ahmet Altay  wrote:
> 
> 
> 
> On Sat, Aug 28, 2021 at 1:42 AM Pavel Solomin  > wrote:
> Hello!
> 
> I would like to understand whether KinesisIO enhanced fan-out is something 
> Beam community sees useful to be added. Flink currently supports it - 
> https://issues.apache.org/jira/browse/FLINK-17688 
> .
> 
> I have seen the closed PR - https://github.com/apache/beam/pull/9899 
>  - and would like to know if there 
> is anyone currently working on enhanced fan-out feature. If not, what's the 
> best way to approach this? Create a Jira issue first, and then - discuss 
> design with d...@beam.apache.org 
> ?
> 
> I do not know the status of this. Your proposed approach (file a jira and 
> discuss on dev) sounds good.
>  
> 
> Thanks.
> 
> Best Regards,
> Pavel Solomin
> 
> Tel: +351 962 950 692  | Skype: pavel_solomin | 
> Linkedin 
> 
> 



Re: Perf issue with Beam on spark (spark runner)

2021-08-06 Thread Alexey Romanenko


> On 5 Aug 2021, at 18:17, Tao Li  wrote:
> 
> It was a great presentation!

Thanks!

>  Regarding my perf testing, I was not doing aggregation, filtering, 
> projection or joining. I was simply reading all the fields of parquet and 
> then immediately save PCollection back to parquet.

Well, of course, if you read all fields (columns) then you don’t need column 
projection. Otherwise, it can give a quite significant performance boost, 
especially for large tables with many columns. 


> Regarding SDF translation, is it enabled by default?

From Beam 2.30.0 release notes:

"Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI 
opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based 
Read transforms 
([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670))”

—
Alexey

>  I will check out ParquetIO splittable. Thanks!
>  
> From: Alexey Romanenko 
> Date: Thursday, August 5, 2021 at 6:40 AM
> To: Tao Li 
> Cc: "user@beam.apache.org" , Andrew Pilloud 
> , Ismaël Mejía , Kyle Weaver 
> , Yuchu Cao 
> Subject: Re: Perf issue with Beam on spark (spark runner)
>  
> It’s very likely that Spark SQL may have much better performance because of 
> SQL push-downs and avoiding additional ser/deser operations.
>  
> In the same time, did you try to leverage "withProjection()” in ParquetIO and 
> project only the fields that you needed? 
>  
> Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])?
>  
> Also, using SDF translation for Read on Spark Runner can cause performance 
> degradation as well (we noticed that in our experiments). Try to use non-SDF 
> read (if not yet) [2]
>  
>  
> PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m 
> not sure if a recording is already available but you can find the slides here 
> [3] that can be helpful.
>  
>  
> —
> Alexey
>  
> [1] https://issues.apache.org/jira/browse/BEAM-12070 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Yq%2FODFNPo7XncHKExNDRBw6qRH2HSrymTcSGGRRWICs%3D&reserved=0>
> [2] https://issues.apache.org/jira/browse/BEAM-10670 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=ABQA4rB%2BeiMHIGdXQKiADS93F9%2F3bUfn4%2BCRRr4dgVI%3D&reserved=0>
> [3] 
> https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing
>  
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=%2Fj0Qeibje5jk0Hiz9x57Pa92mRTyzvmTf63hOrNCPZ4%3D&reserved=0>
>  
> 
> 
>> On 5 Aug 2021, at 03:07, Tao Li mailto:t...@zillow.com>> 
>> wrote:
>>  
>> @Alexey Romanenko <mailto:aromanenko@gmail.com> @Ismaël Mejía 
>> <mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can you 
>> please take a look at this thread and confirm this jira covers the causes 
>> https://issues.apache.org/jira/browse/BEAM-12646 
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=c23T9dKc0muC7sRWrsAYrewA4QKAUSc6tOAwe9kRfC4%3D&reserved=0>
>>  ?
>>  
>> This perf issue is currently a blocker to me..
>>  
>> Thanks so much!
>>  
>> From: Tao Li mailto:t...@zillow.com>>
>> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
>> mailto:user@beam.apache.org>>
>> Date: Friday, July 30, 2021 at 3:53 PM
>> To: Andrew Pilloud mailto:apill...@google.com>>, 
>> "user@beam.apache.org <mailto:user@beam.apache.org>" > <mailto:user@

Re: Perf issue with Beam on spark (spark runner)

2021-08-05 Thread Alexey Romanenko
It’s very likely that Spark SQL may have much better performance because of SQL 
push-downs and avoiding additional ser/deser operations.

In the same time, did you try to leverage "withProjection()” in ParquetIO and 
project only the fields that you needed? 

Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])?

Also, using SDF translation for Read on Spark Runner can cause performance 
degradation as well (we noticed that in our experiments). Try to use non-SDF 
read (if not yet) [2]


PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m not 
sure if a recording is already available but you can find the slides here [3] 
that can be helpful.


—
Alexey

[1] https://issues.apache.org/jira/browse/BEAM-12070
[2] https://issues.apache.org/jira/browse/BEAM-10670
[3] 
https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing
 

> On 5 Aug 2021, at 03:07, Tao Li  wrote:
> 
> @Alexey Romanenko <mailto:aromanenko@gmail.com> @Ismaël Mejía 
> <mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can you 
> please take a look at this thread and confirm this jira covers the causes 
> https://issues.apache.org/jira/browse/BEAM-12646 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081708037%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UCxzySGVB8H%2B2tOmjDVN5FqeSVxarmD5c1gg3Xa4RKA%3D&reserved=0>
>  ?
>  
> This perf issue is currently a blocker to me..
>  
> Thanks so much!
>  
> From: Tao Li mailto:t...@zillow.com>>
> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Date: Friday, July 30, 2021 at 3:53 PM
> To: Andrew Pilloud mailto:apill...@google.com>>, 
> "user@beam.apache.org <mailto:user@beam.apache.org>"  <mailto:user@beam.apache.org>>
> Cc: Kyle Weaver mailto:kcwea...@google.com>>, Yuchu Cao 
> mailto:yuc...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
>  
> Thanks everyone for your help.
>  
> We actually did another round of perf comparison between Beam (on spark) and 
> native spark, without any projection/filtering in the query (to rule out the 
> “predicate pushdown” factor).
>  
> The time spent on Beam with spark runner is still taking 3-5x period of time 
> compared with native spark, and the cause 
> ishttps://issues.apache.org/jira/browse/BEAM-12646 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081708037%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UCxzySGVB8H%2B2tOmjDVN5FqeSVxarmD5c1gg3Xa4RKA%3D&reserved=0>
>  according to the spark metrics. Spark runner is pretty much the bottleneck.
>  
> 
>  
> From: Andrew Pilloud mailto:apill...@google.com>>
> Date: Thursday, July 29, 2021 at 2:11 PM
> To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Cc: Tao Li mailto:t...@zillow.com>>, Kyle Weaver 
> mailto:kcwea...@google.com>>, Yuchu Cao 
> mailto:yuc...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
>  
> Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0.
>  
> Andrew
>  
> On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud  <mailto:apill...@google.com>> wrote:
>> Beam SQL doesn't currently have project pushdown for ParquetIO (we are 
>> working to expand this to more IOs). Using ParquetIO withProjection directly 
>> will produce better results.
>>  
>> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw > <mailto:rober...@google.com>> wrote:
>>> Could you try using Beam SQL [1] and see if that gives more similar result 
>>> to your Spark SQL query? I would also be curious if the performance is 
>>> sufficient using withProjection to only read the auction, price, and bidder 
>>> columns. 
>>>  
>>> [1] https://beam.apache.org/documentation/dsls/sql/overview/ 
>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081698082%7CUnknown%7CT

Re: Beam Calcite SQL SparkRunner Performance

2021-07-06 Thread Alexey Romanenko
I think it’s quiet expected since Spark may push down the SQL query (or some 
parts of the query) to IO or/and RDD level and apply different type of 
optimisations there, whereas Beam SQL translates an SQL query into the general 
Beam pipeline which then is translated by SparkRunner into Spark pipeline (in 
your case). 

So, potentially we can also have some push-downs here, like Schema projection 
that we already have for ParquetIO. I believe that “filters" can be the next 
step but joins could be tricky since now they are based on other Beam 
PTransforms.

—
Alexey

> On 6 Jul 2021, at 04:39, Tao Li  wrote:
> 
> @Alexey Romanenko <mailto:aromanenko@gmail.com> do you have any thoughts 
> on this issue? Looks like the dag compiled by “Beam on Spark” has many more 
> stages than native spark, which results in more shuffling and thus longer 
> processing time.
>  
> From: Yuchu Cao mailto:yuc...@trulia.com>>
> Date: Monday, June 28, 2021 at 8:09 PM
> To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Cc: Tao Li mailto:t...@zillow.com>>
> Subject: Beam Calcite SQL SparkRunner Performance 
>  
> Hi Beam community, 
>  
> We are trying to compare performance of Beam SQL on Spark with native Spark. 
> The query that used for the comparison is below. The nexmark_bid schema is in 
> parquet format and file size is about 35GB.
> SELECT auction, price FROM nexmark_bid WHERE auction = 1007 OR auction = 1020 
> OR auction = 2001 OR auction = 2019 OR auction = 1087
>  
> And we noticed that the Beam Spark jobs execution had 16 stages in total, 
> while Native spark job only had 2 stages; and the native Spark job is 7 times 
> faster than Beam Spark job with the same resource allocation settings in 
> spark-submit commands. 
>  
> Any reason why Beam Spark job execution created more stages and 
> mapPartitionRDDs than native Spark? Can the performance of such query be 
> improved in any way ? Thank you!
>  
> Beam Spark job stages and stage 11 DAG: 
>  
> 
> 
>  
>  
>  
> Native Spark job stages and stage 1 DAG: 
> 
> 



Re: SparkRunner

2021-07-01 Thread Alexey Romanenko
Hi Trevor,

Beam Portable Spark pipeline in the end is just a Spark pipeline which you run 
on Spark cluster. So, all resources are managed by processing engine (Spark in 
your case) and cluster configuration, Beam doesn’t handle errors on this level.

So, on your place, I’d investigate this issue on Spark/EMR level. Quite likely 
that you are facing OOM limits on Spark workers when you run it on a mixed 
cluster.

—
Alexey  

> On 1 Jul 2021, at 13:19, Trevor Kramer  wrote:
> 
> Hi everyone. We have a Beam pipeline running using the portable Spark runner 
> on EMR. If we use 100% on-demand Core nodes the pipeline finishes 
> successfully. If we run a mix of on-demand Core nodes and spot Task nodes the 
> pipeline fails every time with the following error. Does Beam have resiliency 
> against losing nodes and does it schedule with awareness of Core vs Task 
> nodes?
> 
> Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Job 
> aborted due to stage failure: A shuffle map stage with indeterminate output 
> was failed and retried. However, Spark cannot rollback the ShuffleMapStage 5 
> to re-process the input data, and has to fail this job. Please eliminate the 
> indeterminacy by checkpointing the RDD before repartition and try again.
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
>   at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
>   at 
> org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:199)
>   at 
> org.apache.beam.runners.spark.SparkPipelineRunner.main(SparkPipelineRunner.java:263)
>   ... 5 more
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> A shuffle map stage with indeterminate output was failed and retried. 
> However, Spark cannot rollback the ShuffleMapStage 5 to re-process the input 
> data, and has to fail this job. Please eliminate the indeterminacy by 
> checkpointing the RDD before repartition and try again.
>   at org.apache.spark.scheduler.DAGScheduler.org 
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2136)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2124)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2123)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$13.apply(DAGScheduler.scala:1674)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$13.apply(DAGScheduler.scala:1666)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> 
> Thanks,
> 
> Trevor



Re: How to specify a spark config with Beam spark runner

2021-06-10 Thread Alexey Romanenko
Hi Tao,

"Limited spark options”, that you mentioned, are Beam's application arguments 
and if you run your job via "spark-submit" you should still be able to 
configure Spark application via normal spark-submit “--conf key=value” CLI 
option. 
Doesn’t it work for you?

—
Alexey

> On 10 Jun 2021, at 01:29, Tao Li  wrote:
> 
> Hi Beam community,
>  
> We are trying to specify a spark config 
> “spark.hadoop.fs.s3a.canned.acl=BucketOwnerFullControl” in the spark-submit 
> command for a beam app. I only see limited spark options supported according 
> to this doc: https://beam.apache.org/documentation/runners/spark/ 
> 
>  
> How can we specify an arbitrary spark config? Please advise. Thanks!



Re: Oracle JDBC driver with expansion service

2021-05-25 Thread Alexey Romanenko
Hi, 

This question looks more as a user-related question, so let's continue this 
conversation on user@

—
Alexey

> On 25 May 2021, at 15:32, Rafael Ribeiro  wrote:
> 
> Hi,
> 
> I'm trying to read and write on Oracle database using the JDBC driver of Beam
> 
> but I'm having some problems, specially on Dataflow that does not find the 
> jar dependency
> 
> could anyone help to solve this problem?
> 
> PS:
> I have to compile JDBC class with jar dependency on it and create a expansion 
> service to it
> no ideas
> -- 
> Rafael Fernando Ribeiro



Re: JdbcIO parallel read on spark

2021-05-25 Thread Alexey Romanenko
Hi,

Did you check a Spark DAG if it doesn’t fork branches after "Genereate queries” 
transform?

—
Alexey

> On 24 May 2021, at 20:32, Thomas Fredriksen(External) 
>  wrote:
> 
> Hi there,
> 
> We are struggling to get the JdbcIO-connector to read a large table on spark.
> 
> In short - we wish to read a large table (several billion rows), transform 
> then write the transformed data to a new table.
> 
> We are aware that `JdbcIO.read()` does not parallelize. In order to solve 
> this, we attempted to create ranges then generate `limit/offset` queries and 
> use `JdbcIO.readAll()` instead.
> 
> The overall steps look something like this (sanitized for readability):
> 
> ```
> pipeline
>   .apply("Read row count", JdbcIo.read()
> .withQuery("select count(*) from MYTABLE;")
> .withCoder(VarLongCoder.of())
> .withOtherOptions(...))
>   .apply("Genereate queries", ParDo.of(new DoFn() {...}) // 
> Outputs table offsets
>   .apply("Read results", JdbcIO.readAll()
> .withCoder(SchemaCoder.of(...))
> .withOutputParallelization(false)
> .withQuery("select * from MYTABLE offset ? limit MYLIMIT;")
> .withParameterSetter((element, statement) -> statement.setLong(1, 
> element))
> .withOtherOptions(...))
>   .apply("more steps", ...);
> ```
> 
> The problem is that this does not seem to parallelize on the spark runner. 
> Only a single worker seem to be doing all the work.
> 
> We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`, 
> however this did not seem to make a difference.
> 
> Our goal is to avoid all data from the query be cached in memory between the 
> read and transform operations. This causes OOM-exceptions. Having a single 
> worker reading the database is okay as long as other workers can process the 
> data as soon as it is read and not having to wait for all the data to be 
> ready.
> 
> Any advice on how we approach this.
> 
> Best Regards
> Thomas Li Fredriksen



Re: File processing triggered from external source

2021-05-25 Thread Alexey Romanenko
You don’t need to use windowing strategy or aggregation triggers for a pipeline 
with bounded source to perform GbK-like transforms, but since you started to 
use unbounded source then your pcollections became unbounded and you need to do 
that. Otherwise, it’s unknown at which point of time your GbK transforms will 
have all data arrived to process it (in theory, it will never happened because 
of “unbounded” definition).

What is an issue with applying windowing/triggering strategy for your case?

—
Alexey

> On 24 May 2021, at 10:25, Sozonoff Serge  wrote:
> 
> Hi,
> 
> Referring to the explanation found at the following link under (Stream 
> processing triggered from an external source)
> 
> https://beam.apache.org/documentation/patterns/file-processing/ 
> 
> 
> 
> While implementing this solution I am trying to figure out how to deal with 
> the fact that my pipeline, which was bound, has now become unbound. It 
> exposes me to windowing/triggering concerns which I did not have de deal with 
> before and in essence are unnecessary since I am still fundamentally dealing 
> with bound data. The only reason I have an unbound source involved is as a 
> trigger and provider of the file to be processed.
> 
> Since my pipeline uses GroupByKey transforms I get the following error.
> 
> Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot 
> be applied to non-bounded PCollection in the GlobalWindow without a trigger. 
> Use a Window.into or Window.triggering transform prior to GroupByKey.
> 
> Do I really need to add windowing/triggering semantics to the PCollections 
> which are built from bound data ?
> 
> Thanks for any pointers.
> 
> Serge



Re: KafkaIO with DirectRunner is creating tons of connections to Kafka Brokers

2021-05-25 Thread Alexey Romanenko

> On 24 May 2021, at 10:43, Sozonoff Serge  wrote:
> 
> OK thanks.  Just to clarify, in my case the message throughput is zero when I 
> start the Beam pipeline up and it will still crash once all file handles are 
> consumed even if I dont send a single message to the kafka topic.

This sounds like a bug for me even if it happens only with DirectRunner.  Mind 
you to provide a code of a pipeline and running command that reproduces this 
issue?

—
Alexey

> 
> Thanks,
> Serge
> 
> On 24 May 2021 at 10:14:33, Jan Lukavský (je...@seznam.cz 
> ) wrote:
> 
>> It is not 100 consumers, the checkpoint is created every 100 records. So, if 
>> your message throughput is high enough, the consumers might be created 
>> really often. But most importantly - DirectRunner is really not intended for 
>> performance sensitive applications. You should use a different runner for 
>> that.
>> 
>> Best,
>> 
>>  Jan
>> 
>> On 5/24/21 10:03 AM, Sozonoff Serge wrote:
>>> Hi Jan,
>>> 
>>> So if I read your SO answer correctly and looking at the Github link you 
>>> provided we are talking about ~100 consumers ? Since I am developing 
>>> locally with a dockerized minimal Kafka broker it is possible that this is 
>>> enough to hit the max open files limit. 
>>> 
>>> Depending on your definition of “limited” I would say there are more than a 
>>> limited number present at the same time. If you look at the below log 
>>> extract everyone of those “Kafka version: 2.5.0” lines corresponds to a 
>>> Kafka consumer instantiation and that’s within a very short period of time 
>>> !! 
>>> 
>>> Thanks,
>>> Serge
>>> 
>>> 
>>> 
>>> [INFO ] 2021-05-24 09:53:48.663 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.688 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.803 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.815 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.864 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.871 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.955 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:48.969 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.046 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.052 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.113 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.128 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.231 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.236 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.278 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.281 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.316 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.321 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.435 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.444 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.486 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.494 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.564 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.575 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 2.5.0
>>> [INFO ] 2021-05-24 09:53:49.662 [direct-runner-worker] 
>>> apache.kafka.common.utils.AppInfoParser$AppInfo - 

Re: Exporting beam custom metrics to Prometheus

2021-04-28 Thread Alexey Romanenko
Hi,

Could it be related [1][2]?

[1] https://issues.apache.org/jira/browse/BEAM-7438
[2] https://issues.apache.org/jira/browse/BEAM-10928

—
Alexey

> On 28 Apr 2021, at 08:45, Feba Fathima  wrote:
> 
> Hi,
> 
>I have a java beam pipeline which is run using a flink runner. I tried 
> adding custom metrics to it which I want to export to Prometheus. I would 
> like to know how to export the beam custom metrics to Prometheus. Please help 
> me with this. 
> 
> Thanks in advance
> Feba Fathima



Re: Writing to multiple S3 buckets in multiple regions

2021-04-28 Thread Alexey Romanenko
Hi Valeri,

For now it’s not possible to write to the different AWS regions from the same 
write transform instance. There is an open Jira about this [1].

As a workaround (not very effective maybe, I didn’t try) but I guess you can 
branch your input PCollection into several branches, depending on the number of 
destination regions, and then apply for every branch a write PTransform 
configured with a different AWS region. See an example how to branch with 
multiple outputs here [2] at “A single transform that produces multiple outputs”

—
Alexey

[1] https://issues.apache.org/jira/browse/BEAM-10133
[2] 
https://beam.apache.org/documentation/pipelines/design-your-pipeline/#branching-pcollections

> On 27 Apr 2021, at 16:04, Valeri Tsolov  wrote:
> 
> Hello all,
> we are trying to run an Apache Beam pipeline which reads from Pub/Sub and 
> writes to multiple S3 buckets. The problem comes from the fact that we do not 
> know the region of destination buckets in Amazon. The items are mixed on the 
> Pub/Sub side for multiple s3 buckets in multiple regions.
> 
> The code for writing to AWS is below. We are also seeing that it is required 
> to have a specified concrete region. 
> Is there a way to write from a single Apache Beam pipeline to multiple AWS 
> regions? 
> 
> Thanks,
> Valeri
> public class WritePTransform extends
> PTransform>, WriteFilesResult> {
>   private final String tempDirectory;
>   private final SerializableFunction namingFn;
> 
>   public WritePTransform(String tempDirectory, SerializableFunction FileNaming> namingFn) {
> this.tempDirectory = tempDirectory;
> this.namingFn = namingFn;
>   }
> 
>   @Override
>   public WriteFilesResult expand(PCollection> 
> input) {
> return input.apply("WriteFilesToFileSystem",
> FileIO.>writeDynamic()
> .by(KV::getKey)
> .withDestinationCoder(StringUtf8Coder.of())
> .withTempDirectory(tempDirectory)
> .via(Contextful.fn(KV::getValue), TextIO.sink())
> .withNaming(namingFn)
> .withNumShards(1));
>   }
> }
> 
> CONFIDENTIALITY NOTICE – This message and any attached documents contain 
> information which may be confidential, subject to privilege or exempt from 
> disclosure under applicable law.  These materials may be used only by the 
> intended recipient of this communication.  You are hereby notified that any 
> distribution, disclosure, printing, copying, storage, modification or the 
> taking of any action in reliance on this communication is strictly 
> prohibited.  Delivery of this message to any person other than the intended 
> recipient shall not compromise or waive such confidentially, privilege or 
> exemption from disclosure as to this communication.  If you have received 
> this communication in error, please immediately notify the sender and delete 
> the message in its entirety from your system.
> 



Re: Avoiding OutOfMemoryError for large batch-jobs

2021-04-27 Thread Alexey Romanenko


> On 27 Apr 2021, at 08:39, Thomas Fredriksen(External) 
>  wrote:
> 
> Thank you, this is very informative.
> 
> We tried reducing the JdbcIO batch size from 1 to 1000, then to 100. In 
> our runs, we no longer see the explicit OOM-error, but we are seeing executor 
> heartbeat timeouts. From what we understand, this is typically caused by 
> OOM-errors also. However, the stage in question is ready from a web server 
> that can be slow to respond. Could it be that the request to the web server 
> is locking the executor long enough to cause the heartbeat timeout?

Well, I think it can but I don’t see how it can lead to OOM in this case. Did 
you see something about this in logs?

> 
> On Mon, Apr 26, 2021 at 1:48 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> 
> 
>> On 26 Apr 2021, at 13:34, Thomas Fredriksen(External) 
>> mailto:thomas.fredrik...@cognite.com>> wrote:
>> 
>> The stack-trace for the OOM:
>> 
>> 21/04/21 21:40:43 WARN TaskSetManager: Lost task 1.2 in stage 2.0 (TID 57, 
>> 10.139.64.6, executor 3): org.apache.beam.sdk.util.UserCodeException: 
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>> at 
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteVoid$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>  Source)
> 
> It may be caused by a large total size of batched records before WriteFn 
> flushes them. 
> Did you try to decrease the number by “withBatchSize(long)” (by default, it’s 
> 1000) ? [1]
> 
> [1] 
> https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/jdbc/JdbcIO.WriteVoid.html#withBatchSize-long-
>  
> <https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/jdbc/JdbcIO.WriteVoid.html#withBatchSize-long->
>> at 
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
>> at 
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
>> at 
>> org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
>> at 
>> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140)
>> at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
>> at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
>> at 
>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
>> at 
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
>> at 
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
>> at org.apache.spark.scheduler.Task.run(Task.scala:113)
>> at 
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>> 
>> I should note this exception is not always printed. The issue is usually 
>> represented by an ExecutorLostFailure (I am assuming this is caused by an 
>> OOM-error):
>> 
>> 21/04/21 21:44:52 ERROR ScalaDriverLocal: User Code Stack Trace: 
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due 
>> to stage failure: Task 5 in stage 2.0 failed 4 times, most recent failure: 
>> Lost task 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): 
>> ExecutorLostFailure 

Re: Avoiding OutOfMemoryError for large batch-jobs

2021-04-26 Thread Alexey Romanenko
eduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
> at scala.Option.foreach(Option.scala:257)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2282)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2304)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2348)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:979)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:977)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
> at org.apache.spark.rdd.RDD.foreach(RDD.scala:977)
> at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:359)
> at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
> at 
> org.apache.beam.runners.spark.translation.BoundedDataset.action(BoundedDataset.java:127)
> at 
> org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:228)
> at 
> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:241)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> 21/04/21 21:44:52 INFO ProgressReporter$: Removed result fetcher for 
> 4169729928902178844_6376508440917463187_job-95-run-16-action-295
> 
> xxx
> 
> On Mon, Apr 26, 2021 at 1:03 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Hi Thomas,
> 
> Could you share the stack trace of your OOM and, if possible, the code 
> snippet of your pipeline? 
> Afaik, usually only “large" GroupByKey transforms, caused by “hot keys”, may 
> lead to OOM with SparkRunner.
> 
> —
> Alexey
> 
> 
> > On 26 Apr 2021, at 08:23, Thomas Fredriksen(External) 
> > mailto:thomas.fredrik...@cognite.com>> 
> > wrote:
> > 
> > Good morning,
> > 
> > We are ingesting a very large dataset into our database using Beam on 
> > Spark. The dataset is available through a REST-like API and is splicedin 
> > such a way so that in order to obtain the whole dataset, we must do around 
> > 24000 API calls.
> > 
> > All in all, this results in 24000 CSV files that need to be parsed then 
> > written to our database.
> > 
> > Unfortunately, we are encountering some OutOfMemoryErrors along the way. 
> > From what we have gathered, this is due to the data being queued between 
> > transforms in the pipeline. In order to mitigate this, we have tried to 
> > implement a streaming-scheme where the requests streamed to the request 
> > executor, the flows to the database. This too produced the OOM-error.
> > 
> > What are the best ways of implementing such pipelines so as to minimize the 
> > memory footprint? Are there any differences between runners we should be 
> > aware of here? (e.g. between Dataflow and Spark)
> 



Re: Avoiding OutOfMemoryError for large batch-jobs

2021-04-26 Thread Alexey Romanenko
Hi Thomas,

Could you share the stack trace of your OOM and, if possible, the code snippet 
of your pipeline? 
Afaik, usually only “large" GroupByKey transforms, caused by “hot keys”, may 
lead to OOM with SparkRunner.

—
Alexey


> On 26 Apr 2021, at 08:23, Thomas Fredriksen(External) 
>  wrote:
> 
> Good morning,
> 
> We are ingesting a very large dataset into our database using Beam on Spark. 
> The dataset is available through a REST-like API and is splicedin such a way 
> so that in order to obtain the whole dataset, we must do around 24000 API 
> calls.
> 
> All in all, this results in 24000 CSV files that need to be parsed then 
> written to our database.
> 
> Unfortunately, we are encountering some OutOfMemoryErrors along the way. From 
> what we have gathered, this is due to the data being queued between 
> transforms in the pipeline. In order to mitigate this, we have tried to 
> implement a streaming-scheme where the requests streamed to the request 
> executor, the flows to the database. This too produced the OOM-error.
> 
> What are the best ways of implementing such pipelines so as to minimize the 
> memory footprint? Are there any differences between runners we should be 
> aware of here? (e.g. between Dataflow and Spark)



Re: JdbcIO SQL best practice

2021-04-15 Thread Alexey Romanenko
I don’t think so because this statement [1] is used in this case. 
 
[1] 
https://github.com/apache/beam/blob/97af0775cc19a4997a4b60c6a75d003f8e86cf1f/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java#L56

> On 14 Apr 2021, at 14:44, Thomas Fredriksen(External) 
>  wrote:
> 
> This seems very promising,
> 
> Will the write from PCollectino handle upserts?
> 
> On Wed, Mar 24, 2021 at 6:56 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Thanks for details.
> 
> If I’m not mistaken, JdbcIO already supports both your suggestions for read 
> and write (at lest, in some way) [1][2]. 
> 
> Some examples from tests:
> - write from PCollection [3], 
> - read to PCollection [4], 
> - write from PCollection with JavaBeanSchema [5] 
> 
> Is it something that you are looking for?
> 
> [1] https://issues.apache.org/jira/browse/BEAM-6674 
> <https://issues.apache.org/jira/browse/BEAM-6674>
> [2] https://github.com/apache/beam/pull/8725 
> <https://github.com/apache/beam/pull/8725>
> [3] 
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java#L469
>  
> <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java#L469>
> [4] 
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java#L524
>  
> <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java#L524>
> [5] 
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java#L469
>  
> <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java#L469>
> 
> 
>> On 23 Mar 2021, at 08:03, Thomas Fredriksen(External) 
>> mailto:thomas.fredrik...@cognite.com>> wrote:
>> 
>> That is a very good question.
>> 
>> Personally, I would prefer that read and write were simplified. I guess 
>> there will always be a need for writing complex queries, but the vast 
>> majority of pipelines will only need to read or write data to or from a 
>> table. As such, having read/write functions that will take an input-class 
>> (BEAN or POJO for example) and simply generate the required write-statement 
>> would be sufficient. Upserts should also be a part of this.
>> 
>> For example:
>> 
>> ```
>> PCollection collection = ...;
>> collection.apply("Write to database", JdbcIO.writeTable(MyBean.class)
>> .withDataSourceConfiguration(mySourceConfiguration)
>> .withTableName(myTableName)
>> .withUpsertOption(UpsertOption.create()
>> .withConflictTarget(keyColumn)
>> .withDoUpdate());
>> ```
>> This would of course assume that the columns of `myTableName` would match 
>> the members of `MyBean`.
>> 
>> There are of course technical challenges with this:
>> * How to handle situations where the column names do not match the input-type
>> * How to detect columns from the input-type.
>> 
>> As an alternative, schemas may be an option:
>> 
>> ```
>> PCollection collection = ...;
>> collection.apply("Write to database", JdbcIO.writeRows()
>> .withSchema(mySchema)
>> .withDataSourceConfiguration(mySourceConfiguration)
>> .withTableName(myTableName)
>> .withUpsertOption(UpsertOption.create()
>> .withConflictTarget(keyColumn)
>> .withDoUpdate());
>> ```
>> This would allow for greater flexibility, but we lose the type-strong nature 
>> of first suggestion.
>> 
>> I hope this helps.
>> 
>> Best Regards
>> Thomas Li Fredriksen
>> 
>> On Fri, Mar 19, 2021 at 7:17 PM Alexey Romanenko > <mailto:aromanenko@gmail.com>> wrote:
>> Hmm, interesting question. Since we don’t have any answers yet may I ask you 
>> a question - do you have an example of what like this could be these 
>> practises or how it can be simplified?
>> 
>> 
>> PS: Not sure that it can help but JdbcIO allows to set a query with 
>> “ValueProvider” option which can be helpful to parametrise your transform 
>> with values that are only available during pipeline execution and can be 
>>

Re: Checkpointing Dataflow Pipeline

2021-04-07 Thread Alexey Romanenko
I can’t say exactly how it will work with Dataflow but for Spark Runner I 
answered it here [1]:

“Since KinesisIO is based on UnboundedSource.CheckpointMark, it uses the 
standard checkpoint mechanism, provided by Beam UnboundedSource.UnboundedReader.

Once a KinesisRecord has been read (actually, pulled from a records queue that 
is feed separately by actually fetching the records from Kinesis shard), then 
the shard checkpoint will be updated [2] by using the record SequenceNumber and 
then, depending on runner implementation of UnboundedSource and checkpoints 
processing, will be saved.

Afaik, Beam Spark Runner uses Spark States mechanism for this purposes. [3]”

Alexey


[1] 
https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
[2] 
https://github.com/apache/beam/blob/01ba86777ab4ff6c4432af0896d239ada9d5d2f1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java#L115
[3] 
https://github.com/apache/beam/blob/01ba86777ab4ff6c4432af0896d239ada9d5d2f1/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java#L200

> On 7 Apr 2021, at 00:12, Kenneth Knowles  wrote:
> 
> This sounds similar to the "Kafka Commit" in 
> https://github.com/apache/beam/pull/12572 
> <https://github.com/apache/beam/pull/12572> by +Boyuan Zhang 
> <mailto:boyu...@google.com> and also to how PubsubIO ACKs messages in the 
> finalizer. I don't know much about KinesisIO or how Kinesis works. I was just 
> asking to clarify, in case other folks know more, like +Alexey Romanenko 
> <mailto:aromanenko@gmail.com> and +Ismaël Mejía 
> <mailto:ieme...@gmail.com> have modified KinesisIO. If the feature does not 
> exist today, perhaps we can identify the best practices around this pattern.
> 
> Kenn
> 
> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey  <mailto:adude3...@gmail.com>> wrote:
> Hi Kenn,
> 
> yes, resuming reading at the proper timestamp is exactly the issue we are 
> currently struggling with. E.g. with Kinesis Client Lib we could store the 
> last read within some dynamo table. This mechanism is not used with beam, as 
> we understand, the runner is responsible to track that checkpoint mark.
> 
> Now, obviously on restarting the pipeline, e.g. on non compatible upgrade, 
> that is, an pipeline update is just not feasible, there must be some 
> mechanism in place on how Dataflow will know where to continue. Is that 
> simply the pipeline name? Or is there more involved? So how does 
> checkpointing actually work here?
> 
> Based on 'name', wouldn't that imply that something like (example taken from 
> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates 
> <https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates>)
> 
>   export REGION="us-central1"
> 
>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
> +%Y%m%d-%H%M%S`" \
> --template-file-gcs-location "$TEMPLATE_PATH" \
> --parameters inputSubscription="$SUBSCRIPTION" \
> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
> --region "$REGION"
> will not resume on last read on rerun, because the name obviously changes 
> here?
> 
> best,
> 
> michel
> 
> 
> 
> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  <mailto:k...@apache.org>> wrote:
> I would assume the main issue is resuming reading from the Kinesis stream 
> from the last read? In the case for Pubsub (just as another example of the 
> idea) this is part of the internal state of a pre-created subscription.
> 
> Kenn
> 
> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey  <mailto:adude3...@gmail.com>> wrote:
> Hi list,
> 
> with our current project we are implementing our streaming pipeline based on 
> Google Dataflow.
> 
> Essentially we receive input via Kinesis, doing some filtering, enrichment 
> and sessionizing and output to PubSub and/or google storage.
> 
> After short investigations it is not clear to us, how checkpointing will work 
> running on Dataflow in connection with KinesisIO. Is there any 
> documentation/discussions to get a better understanding on how that will be 
> working? Especially if we are forced to restart our pipelines, how could we 
> ensure not to loose any events?
> 
> As far as I understand currently, it should work 'auto-magically' but it is 
> not yet clear to us, how it will actually behave. Before we try to start 
> testing our expectations or even try to implement some watermark-tracking by 
> ourself we hoped to get some insights from other users here.
> 
> Any help appreciated.
> 
> Best,
> 
> michel



Re: Write to multiple IOs in linear fashion

2021-03-25 Thread Alexey Romanenko
I think you are right, since "writer.close()”  contains a business logic, it 
must be moved to @FinishBundle. The same thing about DeleteFn.
I’ll create a Jira for that.

> On 25 Mar 2021, at 00:49, Kenneth Knowles  wrote:
> 
> Alex's idea sounds good and like what Vincent maybe implemented. I am just 
> reading really quickly so sorry if I missed something...
> 
> Checking out the code for the WriteFn I see a big problem:
> 
> @Setup
> public void setup() {
>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
> }
> 
> @ProcessElement
>   public void processElement(ProcessContext c) throws ExecutionException, 
> InterruptedException {
>   writer.mutate(c.element());
> }
> 
> @Teardown
> public void teardown() throws Exception {
>   writer.close();
>   writer = null;
> }
> 
> It is only in writer.close() that all async writes are waited on. This needs 
> to happen in @FinishBundle.
> 
> Did you discover this when implementing your own Cassandra.Write?
> 
> Until you have waited on the future, you should not output the element as 
> "has been written". And you cannot output from the @TearDown method which is 
> just for cleaning up resources.
> 
> Am I reading this wrong?
> 
> Kenn
> 
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  <mailto:ajam...@google.com>> wrote:
> How about a PCollection containing every element which was successfully 
> written?
> Basically the same things which were passed into it.
> 
> Then you could act on every element after its been successfully written to 
> the sink.
> 
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw  <mailto:rober...@google.com>> wrote:
> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  <mailto:ieme...@gmail.com>> wrote:
> +dev
> 
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
> 
> My proposal is that one returns a PCollection that consists, internally, 
> of something contentless like nulls. This is future compatible with returning 
> something more maningful based on the source source or write process itself, 
> but at least this would be followable. 
>  
> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>  
> <https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E>
> 
> Yeah, we should go ahead and finally do something. 
>  
> 
> > Returning PDone is an anti-pattern that should be avoided, but changing it 
> > now would be backwards incompatible.
> 
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
> 
> 
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
> mailto:aromanenko@gmail.com>> wrote:
> >
> > I thought that was said about returning a PCollection of write results as 
> > it’s done in other IOs (as I mentioned as examples) that have _additional_ 
> > write methods, like “withWriteResults()” etc, that return PTransform<…, 
> > PCollection>.
> > In this case, we keep backwards compatibility and just add new 
> > funtionality. Though, we need to follow the same pattern for user API and 
> > maybe even naming for this feature across different IOs (like we have for 
> > "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw  > <mailto:rober...@google.com>> wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but changing it 
> > now would be backwards incompatible. PRs to add non-PDone returning 
> > variants (probably as another option to the builders) that compose well 
> > with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko  > <mailto:aromanenko@gmail.com>> wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as it was 
> >> mentioned before, it doesn’t work with PDone, only with PCollection as a 
> &

  1   2   3   >