Survey/Pulse: Beam on Flink/Spark/Samza/etc

2024-04-29 Thread Austin Bennett
Curious who all is using Beam on runners other than dataflow.

Please respond either on-list or to me directly ...  Mostly just curious
the extent of whether Beam is fulfilling its promise of runner
agnosticism.  Getting good data on that is hard, so anecdotes would be very
welcomed!


[Question] Apache Beam Spark Runner Support - Spark 3.5, Scala 2.13 Environment

2024-03-22 Thread Sri Ganesh Venkataraman
Hello,

Does the latest Apache Beam version support Spark 3.5, Scala 2.13
environment for spark runner ?
Based on initial analysis -
Spark runner supports Spark 3.5.0 version
*Reference*:
https://github.com/apache/beam/blob/f660f49b6cf5d241285b156ccc4a5f52d82cfa63/runners/spark/3/build.gradle#L36

Would like to know compatibility with Scala version 2.13

Based on the below references, support is extended only to Scala version
2.12 (Libraries are compiled with Scala version 2.12)
https://github.com/apache/beam/blob/0dc330e6d53cce51b13bcb652e80859c1b3a5975/runners/spark/3/build.gradle#L24
https://mvnrepository.com/artifact/org.apache.beam/beam-runners-spark-3/2.54.0

Any insight/future roadmap plans on Scala 2.13 support will be very helpful

Thanks,
Sri Ganesh V


Beam on spark yarn could only start one worker

2024-01-04 Thread Huiming Jiang



Hello,
  We are using beam, which is written by python. And we want to deploy to spark on yarn cluster . 
  But we found that it could only start one spark worker. Also on this worker, beam docker is started.
  - The beam pipeline option printed : 
{key:"beam:option:spark_master:v1" value:{string_value:"local[4]"}}  
{key:"beam:option:direct_runner_use_stacked_bundle:v1" value:{bool_value:true}}  

  Our steps: 
Firstly, we generate jar file with python file. 
python3.7 -m  beam_demo \
--spark_version=3 \
--runner=SparkRunner \
--spark_job_server_jar=/home/hadoop/beam-runners-spark-3-job-server-2.52.0.jar \
--sdk_container_image=apache/beam_python3.7_sdk_boto3:2.45.0   \
--sdk_harness_container_image=apache/beam_python3.7_sdk_boto3:2.45.0 \
--worker_harness_container_image=apache/beam_python3.7_sdk_boto3:2.45.0 \
--environment_type=DOCKER --environment_config=apache/beam_python3.7_sdk_boto3:2.45.0 \
--sdk_location=container  \
--num_workers=2 \
--output_executable_path=jars/beam_demo.jar

  Secondly, we submit it to spark:
 spark-submit   --master yarn --deploy-mode cluster jars/beam_demo.jar

  
Our question is:
1. We configured sparkRunner. Why the runner is direct_runner_use_stacked_bundle? 
2. We submitted job to spark on yarn. Why the printed spark_master is local[4]?
3. Could python beam on Spark yarn cluster start multi workers?

Could you please help to check those questions? Maybe we have something missing. Thank you very much for  your help.
  


 



SparkRunner / PortableRunner Spark config besides Spark Master

2023-12-08 Thread Janek Bevendorff

Hi,

I'm struggling to figure out the best way to make Python Beam jobs 
execute on a Spark cluster running on Kubernetes. Unfortunately, the 
available documentation is incomplete and confusing at best.


The most flexible way I found was to compile a JAR from my Python job 
and submit that via spark-submit. Unfortunately, this seems to be 
extremely buggy and I cannot get it to feed logs from the SDK containers 
back to the Spark executors back to the driver. See: 
https://github.com/apache/beam/issues/29683


The other way would be to use a Beam job server, but here I cannot find 
a sensible way to set any Spark config options besides the master URL. I 
have a spark-defaults.conf with vital configuration, which needs to be 
passed to the job. I see two ways forward here:


1) I could let users run the job server locally in a Docker container. 
This way they could potentially mount their spark-defaults.conf 
somewhere, but I don't really see where (pointers here?). They would 
also need to mount their Kubernetes access credentials somehow, 
otherwise the job server cannot access the cluster.


2) I could run the Job server in the Kubernetes cluster, which would 
resolve the Kubernetes credential issue but not the Spark config issue. 
Though, even if that were solved, I would now force all users to use the 
same Spark config (not ideal).


Is there a better way? From what I can see, the compiled JAR is the only 
viable option, but the log issue is a deal breaker.


Thanks
Janek



smime.p7s
Description: S/MIME Cryptographic Signature


Re: [Question] Apache Beam Spark Runner Support - Spark 3.5 Environment

2023-11-09 Thread Alexey Romanenko
I already added Spark 3.5.0 version to Beam Spark version tests [1] and I 
didn’t notice any regression. 

The next Beam release (2.53.0) should be available in a couple on months, 
depending on release preparation process.

—
Alexey

[1] https://github.com/apache/beam/pull/29327

> On 9 Nov 2023, at 06:37, Giridhar Addepalli  wrote:
> 
> Thank you Alexey for sharing the details.
> 
> Can you please let us know if you are planning to add Spark 3.5.0 
> compatibility test as part of Beam 2.53.0 or not.
> If so, approximately what is the timeline we are looking at for Beam 2.53.0 
> release.
> https://github.com/apache/beam/milestone/17
> 
> Thanks,
> Giridhar.
> 
> On Tue, Nov 7, 2023 at 6:24 PM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
>> Hi Giridhar,
>> 
>>> On 4 Nov 2023, at 08:04, Giridhar Addepalli >> <mailto:giridhar1...@gmail.com>> wrote:
>>> 
>>>  Thank you Alexey for the response.
>>> 
>>> We are using Beam 2.41.0 with Spark 3.3.0 cluster. 
>>> We did not run into any issues. 
>>> is it because in Beam 2.41.0, compatibility tests were run against spark 
>>> 3.3.0 ?
>>> https://github.com/apache/beam/blob/release-2.41.0/runners/spark/3/build.gradle
>> 
>> Correct. 
>> There are some incompatibilities between Spark 3.1/3.2/3.3 versions and we 
>> fixed this for Spark runner in Beam 2.41 to make it possible to compile and 
>> run with different Spark versions. That was a goal of these compatibility 
>> tests.
>> 
>> <22157.png>
>> Fixes #22156: Fix Spark3 runner to compile against Spark 3.2/3.3 by mosche · 
>> Pull Request #22157 · apache/beam
>> github.com
>>  <https://github.com/apache/beam/pull/22157>Fixes #22156: Fix Spark3 runner 
>> to compile against Spark 3.2/3.3 by mosche · Pull Request #22157 · 
>> apache/beam <https://github.com/apache/beam/pull/22157>
>> github.com <https://github.com/apache/beam/pull/22157>
>> 
>>> If so, since compatibility tests were not run against Spark 3.5.0 even in 
>>> latest release of Beam 2.52.0, is it not advised to use Beam 2.52.0 with 
>>> Spark 3.5.0 cluster ?
>> 
>> I’d say, for now it's up to user to test and run it since it was not tested 
>> on Beam CI. 
>> I’m going to add this version for future testing.
>> 
>> —
>> Alexey
>> 
>>> 
>>> Thanks,
>>> Giridhar.
>>> 
>>> On 2023/11/03 13:05:45 Alexey Romanenko wrote:
>>> > AFAICT, the latest tested (compatibility tests) version for now is 3.4.1 
>>> > [1] We may try to add 3.5.x version there.
>>> >  
>>> > I believe that ValidateRunners tests are run only against default Spark 
>>> > 3.2.2 version.
>>> >
>>> > —
>>> > Alexey
>>> >
>>> > [1] 
>>> > https://github.com/apache/beam/blob/2aaf09c0eb6928390d861ba228447338b8ca92d3/runners/spark/3/build.gradle#L36
>>> >
>>> >
>>> > > On 3 Nov 2023, at 05:06, Sri Ganesh Venkataraman >> > > <mailto:sr...@gmail.com>> wrote:
>>> > >
>>> > > Does Apache Beam version (2.41.0)  or latest (2.51.0) support Spark 3.5 
>>> > > environment for spark runner ?
>>> > >
>>> > > Apache Beam - Spark Runner Documentation states -
>>> > > The Spark runner currently supports Spark’s 3.2.x branch
>>> > >
>>> > > Thanks
>>> > > Sri Ganesh V
>>> >
>>> > 
>> 



RE: Re: [Question] Apache Beam Spark Runner Support - Spark 3.5 Environment

2023-11-04 Thread Giridhar Addepalli
 Thank you Alexey for the response.


We are using Beam 2.41.0 with Spark 3.3.0 cluster.
We did not run into any issues.

is it because in Beam 2.41.0, compatibility tests were run against spark
3.3.0 ?
https://github.com/apache/beam/blob/release-2.41.0/runners/spark/3/build.gradle

If so, since compatibility tests were not run against Spark 3.5.0 even in
latest release of Beam 2.52.0, is it not advised to use Beam 2.52.0 with
Spark 3.5.0 cluster ?

Thanks,

Giridhar.


On 2023/11/03 13:05:45 Alexey Romanenko wrote:

> AFAICT, the latest tested (compatibility tests) version for now is 3.4.1
[1] We may try to add 3.5.x version there.

>

> I believe that ValidateRunners tests are run only against default Spark
3.2.2 version.

>

> —

> Alexey

>

> [1]
https://github.com/apache/beam/blob/2aaf09c0eb6928390d861ba228447338b8ca92d3/runners/spark/3/build.gradle#L36

>

>

> > On 3 Nov 2023, at 05:06, Sri Ganesh Venkataraman 
wrote:

> >

> > Does Apache Beam version (2.41.0)  or latest (2.51.0) support Spark 3.5
environment for spark runner ?

> >

> > Apache Beam - Spark Runner Documentation states -

> > The Spark runner currently supports Spark’s 3.2.x branch

> >

> > Thanks

> > Sri Ganesh V

>

>


Re: [Question] Apache Beam Spark Runner Support - Spark 3.5 Environment

2023-11-03 Thread Alexey Romanenko
AFAICT, the latest tested (compatibility tests) version for now is 3.4.1 [1] We 
may try to add 3.5.x version there.
 
I believe that ValidateRunners tests are run only against default Spark 3.2.2 
version.

—
Alexey

[1] 
https://github.com/apache/beam/blob/2aaf09c0eb6928390d861ba228447338b8ca92d3/runners/spark/3/build.gradle#L36


> On 3 Nov 2023, at 05:06, Sri Ganesh Venkataraman 
>  wrote:
> 
> Does Apache Beam version (2.41.0)  or latest (2.51.0) support Spark 3.5 
> environment for spark runner ?
> 
> Apache Beam - Spark Runner Documentation states -
> The Spark runner currently supports Spark’s 3.2.x branch
> 
> Thanks
> Sri Ganesh V



[Question] Apache Beam Spark Runner Support - Spark 3.5 Environment

2023-11-02 Thread Sri Ganesh Venkataraman
Does Apache Beam version (2.41.0)  or latest (2.51.0) support Spark 3.5
environment for spark runner ?

Apache Beam - Spark Runner Documentation states -
The Spark runner currently supports Spark’s 3.2.x branch

Thanks
Sri Ganesh V


Re: Passing "conf" arguments using a portable runner in Java (spark job runner)

2023-07-20 Thread Moritz Mack
Hi Jon,

sorry for the late replay.

A while ago I was struggling with as well. Unfortunately, there’s no direct way 
to do this per pipeline.
However, you can set default arguments by passing them to the job service 
container using the environment variable _JAVA_OPTIONS.

I hope this still helps!
Cheers,
Moritz



On 30.06.23, 20:29, "Jon Molle via user"  wrote:

Hi, I'm having trouble trying to figure out where to pass "conf" spark-submit 
arguments to a spark job service. I don't particularly care at this point 
whether the job service uses the same set of args for all jobs passed

Hi,

I'm having trouble trying to figure out where to pass "conf" spark-submit 
arguments to a spark job service. I don't particularly care at this point 
whether the job service uses the same set of args for all jobs passed to the 
service, I'm just having trouble finding where I can send these args where they 
will get picked up by the service.

For reference, I'm using the portable pipeline options (IE: the portable 
runner, job endpoint, and DOCKER environment type).

Has anyone tried this, specifically in Java? I'm actually writing in Kotlin, 
but all the examples of the portable runner are in Python, which is 
significantly different from the Java design.

Thanks!

As a recipient of an email from the Talend Group, your personal data will be 
processed by our systems. Please see our Privacy Notice 
<https://www.talend.com/privacy-policy/> for more information about our 
collection and use of your personal information, our security practices, and 
your data protection rights, including any rights you may have to object to 
automated-decision making or profiling we use to analyze support or marketing 
related communications. To manage or discontinue promotional communications, 
use the communication preferences 
portal<https://info.talend.com/emailpreferencesen.html>. To exercise your data 
protection rights, use the privacy request 
form<https://talend.my.onetrust.com/webform/ef906c5a-de41-4ea0-ba73-96c079cdd15a/b191c71d-f3cb-4a42-9815-0c3ca021704cl>.
 Contact us here <https://www.talend.com/contact/> or by mail to either of our 
co-headquarters: Talend, Inc.: 400 South El Camino Real, Ste 1400, San Mateo, 
CA 94402; Talend SAS: 5/7 rue Salomon De Rothschild, 92150 Suresnes, France


Re: Does beam spark runner support yarn-cluster mode

2023-07-12 Thread Kai Jiang
Yeah, I think you could check this document out.
https://beam.apache.org/documentation/runners/spark/#running-on-dataproc-cluster-yarn-backed
Spark runner shall support running on yarn-cluster mode.

On Tue, Jul 11, 2023 at 7:58 PM Jeff Zhang  wrote:

> Hi all,
>
> I didn't find much material about spark runner, just wondering
> whether beam spark runner support yarn-cluster mode. Thanks.
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Does beam spark runner support yarn-cluster mode

2023-07-11 Thread Jeff Zhang
Hi all,

I didn't find much material about spark runner, just wondering whether beam
spark runner support yarn-cluster mode. Thanks.


-- 
Best Regards

Jeff Zhang


Passing "conf" arguments using a portable runner in Java (spark job runner)

2023-06-30 Thread Jon Molle via user
Hi,

I'm having trouble trying to figure out where to pass "conf" spark-submit
arguments to a spark job service. I don't particularly care at this point
whether the job service uses the same set of args for all jobs passed to
the service, I'm just having trouble finding where I can send these args
where they will get picked up by the service.

For reference, I'm using the portable pipeline options (IE: the portable
runner, job endpoint, and DOCKER environment type).

Has anyone tried this, specifically in Java? I'm actually writing in
Kotlin, but all the examples of the portable runner are in Python, which is
significantly different from the Java design.

Thanks!


[Question]dataproc(gce)+spark+beam(python) seems only use one worker

2023-03-06 Thread 赵 毓文
Hi Beam Community,

I follow this 
doc(https://beam.apache.org/documentation/runners/spark/#running-on-dataproc-cluster-yarn-backed).
Try to use dataproc to run my beam pipline.

I ceate a dataproc cluster(1 master 2 worker),I could only see one worker is 
working(use `docker ps` to check worker node).
I also increase to use 4 worker got same result.

Is there some paramters that I could use, or I do something wrong?
I also try to use dataproc(gce)+flink+beam(python) use option `parallelism` 
seems not working either.


This is my code.
```
import apache_beam as beam
import logging
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
'--runner=SparkRunner',
'--output_executable_path=job.jar',
'--spark_version=3',
])

class SplitWords(beam.DoFn):
  def __init__(self, delimiter=','):
self.delimiter = delimiter

  def process(self, text):
import time
time.sleep(10)

for word in text.split(self.delimiter):
  yield word

logging.getLogger().setLevel(logging.INFO)
data = ['Strawberry,Carrot,Eggplant','Tomato,Potato']*800



with beam.Pipeline(options=options) as pipeline:
  plants = (
  pipeline
  | 'Gardening plants' >> beam.Create(data)
  | 'Split words' >> beam.ParDo(SplitWords(','))
  | beam.Map(print))
```

Thanks
Yuwen Zhao


[Announcement] Planned removal of Spark 2 runner support in 2.46.0

2023-01-23 Thread Moritz Mack
Dear All,

The runner for Spark 2 was deprecated quite a while back in August 2022 with 
the release of Beam 2.41.0 [1]. We’re planning to move ahead with this and 
finally remove support for Spark 2 (beam-runners-spark) to only maintain 
support for Spark 3 (beam-runners-spark-3) going forward.

Note, Spark 2.4.8 is the latest release of Spark 2. It was released in May 2021 
and no further releases of 2.4.x should be expected, even for bug fixes [2].

If you are still relying on the Spark 2 runner and there’s good reasons not to 
migrate to Spark 3, please reach out and we might reconsider this.

Kind regards,
Moritz


[1] https://github.com/apache/beam/blob/master/CHANGES.md#2410---2022-08-23
[2] https://spark.apache.org/versioning-policy.html



As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>




Re: Metrics in Beam+Spark

2022-07-18 Thread Moritz Mack
Yes, that’s exactly what I was referring to.

A - hopefully - easy way to avoid this problem might be to change the Spark 
configuration to use the following:
--conf 
"spark.metrics.conf.driver.sink.jmx.class"="com.salesforce.einstein.data.platform.connectors.JmxSink"
--conf 
"spark.metrics.conf.executor.sink.jmx.class"="org.apache.spark.metrics.sink.JmxSink"

Beam metrics are only exposed on the driver, so it’s enough to use your custom 
Sink on the driver. Those classpath issues are limited to executor nodes if I 
remember right (and I’d be surprised to see the same on the driver). Though, I 
haven’t tested …

By distributing such code by other means, I meant adding the relevant classes 
to the system classpath of the executor ($SPARK_HOME/jars). How to do that 
depends heavily on your infrastructure. For example, on Kubernetes it could be 
a base image that already contains the custom sink; on EMR it could be a 
bootstrap action that copies the sink from S3 … Of course, that’s not great 
because it affects everything running on the cluster with all the potential 
problems this creates.

/ Moritz



On 16.07.22, 07:54, "Yushu Yao"  wrote:

Hi Moritz, When dealing with custom sinks, it’s fairly common to run into 
classpath issues :⁠​/ The metric system is loaded when an executor starts up, 
by then the application classpath isn’t available yet.⁠​ Typically, that means 
distributing
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Moritz,


When dealing with custom sinks, it’s fairly common to run into classpath issues 
:/ The metric system is loaded when an executor starts up, by then the 
application classpath isn’t available yet. Typically, that means distributing 
such code by other means.


Could you also elaborate a bit about "distributing such code by other means"?

I wrapped JmxSink (just like CsvSink for beam). And now gets the following 
error.

ERROR MetricsSystem: Sink class 
com.salesforce.einstein.data.platform.connectors.JmxSink cannot be instantiated
 Caused by: java.lang.ClassNotFoundException: 
com.salesforce.einstein.data.platform.connectors.JmxSink

I guess this is the classpath issue you were referring to above. Any hints on 
how to fix it will be greatly appreciated.
Thanks!
-Yushu

/Moritz


On 14.07.22, 21:26, "Yushu Yao" 
mailto:yao.yu...@gmail.com>> wrote:

Hi Team,  Does anyone have a working example of a beam job running on top of 
spark? So that I can use the beam metric syntax and the metrics will be shipped 
out via spark's infra?  The only thing I achieved is to be able to 
queryMetrics()
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Team,
Does anyone have a working example of a beam job running on top of spark? So 
that I can use the beam metric syntax and the metrics will be shipped out via 
spark's infra?

The only thing I achieved is to be able to queryMetrics() every half second and 
copy all the metrics into the spark metrics.
Wondering if there is a better way?

Thanks!
-Yushu


MetricQueryResults metrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.inNamespace(namespace))
.build());

for (MetricResult cc : metrics.getGauges()) {
LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
com.codahale.metrics.Gauge gauge =
new SimpleBeamGauge(cc.getAttempted().getValue());
try {
String name = metricName(cc.getKey(), addStepName, addNamespace);
if (registry.getNames().contains(name)) {
LOGGER.info("Removing metric {}", name);
registry.remove(name);
}
registry.register(name, gauge);
} catch (IllegalArgumentException e) {
LOGGER.warn("Duplicated metrics found. Try turning on 
addStepName=true.", e);
}
}

As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>




Re: Metrics in Beam+Spark

2022-07-15 Thread Yushu Yao
Hi Moritz,


>
> When dealing with custom sinks, it’s fairly common to run into classpath
> issues :/ The metric system is loaded when an executor starts up, by then
> the application classpath isn’t available yet. Typically, that means
> distributing such code by other means.
>
>
>

Could you also elaborate a bit about "distributing such code by other
means"?

I wrapped JmxSink (just like CsvSink for beam). And now gets the following
error.

*ERROR MetricsSystem: Sink class
com.salesforce.einstein.data.platform.connectors.JmxSink cannot be
instantiated*
* Caused by: java.lang.ClassNotFoundException:
com.salesforce.einstein.data.platform.connectors.JmxSink*

I guess this is the classpath issue you were referring to above. Any hints
on how to fix it will be greatly appreciated.
Thanks!
-Yushu

/Moritz
>
>
>
>
>
> On 14.07.22, 21:26, "Yushu Yao"  wrote:
>
>
>
> Hi Team,  Does anyone have a working example of a beam job running on top
> of spark? So that I can use the beam metric syntax and the metrics will be
> shipped out via spark's infra?  The only thing I achieved is to be able to
> queryMetrics()
>
> ZjQcmQRYFpfptBannerStart
>
> *This Message Is From an External Sender *
>
> This message came from outside your organization.
>
> Exercise caution when opening attachments or clicking any links.
>
> ZjQcmQRYFpfptBannerEnd
>
> Hi Team,
>
> Does anyone have a working example of a beam job running on top of spark?
> So that I can use the beam metric syntax and the metrics will be shipped
> out via spark's infra?
>
>
>
> The only thing I achieved is to be able to queryMetrics() every half
> second and copy all the metrics into the spark metrics.
>
> Wondering if there is a better way?
>
>
>
> Thanks!
>
> -Yushu
>
>
>
> MetricQueryResults metrics =
> pipelineResult
> .metrics()
> .queryMetrics(
> MetricsFilter.*builder*()
> .addNameFilter(MetricNameFilter.*inNamespace*(namespace))
> .build());
>
> for (MetricResult cc : metrics.getGauges()) {
> *LOGGER*.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
> com.codahale.metrics.Gauge gauge =
> new SimpleBeamGauge(cc.getAttempted().getValue());
> try {
> String name = metricName(cc.getKey(), addStepName, addNamespace);
> if (registry.getNames().contains(name)) {
> *LOGGER*.info("Removing metric {}", name);
> registry.remove(name);
> }
> registry.register(name, gauge);
> } catch (IllegalArgumentException e) {
> *LOGGER*.warn("Duplicated metrics found. Try turning on 
> addStepName=true.", e);
> }
> }
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice.
> <https://www.talend.com/privacy/>*
>
>
>


Re: Metrics in Beam+Spark

2022-07-15 Thread Moritz Mack
Yes, you would have to wrap the Spark JmxSink the same way it’s done for the 
CSV or Graphite one, see [1].
This is necessary to expose Gauges provided by Beam to the sinks.

However, if you are on Spark 3 it’s possible to use the new plugin framework of 
Spark (see). That’s something I was planning to work on but haven’t found time 
yet. Using a driver plugin, respective gauges could be registered in Spark’s 
internal metrics registry (available from PluginContext [3]) without using 
custom sinks.

Btw, all of this is happening on the driver. So actually, there’s no trouble to 
expect with the executor classpath.

/ Moritz


[1] 
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java#L35
[2] 
https://spark.apache.org/docs/3.1.2/api/java/org/apache/spark/api/plugin/SparkPlugin.html
[3] 
https://spark.apache.org/docs/3.1.2/api/java/org/apache/spark/api/plugin/PluginContext.html#metricRegistry—


On 15.07.22, 07:47, "Yushu Yao"  wrote:

Thanks Mortiz! We are using jmx to ship the metrics out of spark.⁠​ Most of the 
spark built-in driver and executor metrics are going out fine.⁠​ Does this 
require us to make another sink? -Yushu On Thu, Jul 14, 2022 at 1:⁠​05 PM 
Moritz Mack <
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Thanks Mortiz!

We are using jmx to ship the metrics out of spark. Most of the spark built-in 
driver and executor metrics are going out fine.
Does this require us to make another sink?

-Yushu


On Thu, Jul 14, 2022 at 1:05 PM Moritz Mack 
mailto:mm...@talend.com>> wrote:
Hi Yushu,

Wondering, how did you configure your Spark metrics sink? And what version of 
Spark are you using?

Key is to configure Spark to use one of the sinks provided by Beam, e.g.:
"spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink"

Currently there’s support for CSV and Graphite, but it’s simple to support 
others. The sinks typically wrap the corresponding Spark sinks and integrate 
with the Metrics registry of the Spark metric system.

https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html<https://urldefense.com/v3/__https:/beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html__;!!CiXD_PY!Q0RNO7g4itIWf5gyr87zTB4bPHQ8BhgVB5n3lWkvl1MUyhI0H63K5OPMEabBlMykhd5cmyMwOBZcark$>

When dealing with custom sinks, it’s fairly common to run into classpath issues 
:/ The metric system is loaded when an executor starts up, by then the 
application classpath isn’t available yet. Typically, that means distributing 
such code by other means.

/Moritz


On 14.07.22, 21:26, "Yushu Yao" 
mailto:yao.yu...@gmail.com>> wrote:

Hi Team,  Does anyone have a working example of a beam job running on top of 
spark? So that I can use the beam metric syntax and the metrics will be shipped 
out via spark's infra?  The only thing I achieved is to be able to 
queryMetrics()
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Team,
Does anyone have a working example of a beam job running on top of spark? So 
that I can use the beam metric syntax and the metrics will be shipped out via 
spark's infra?

The only thing I achieved is to be able to queryMetrics() every half second and 
copy all the metrics into the spark metrics.
Wondering if there is a better way?

Thanks!
-Yushu


MetricQueryResults metrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.inNamespace(namespace))
.build());

for (MetricResult cc : metrics.getGauges()) {
LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
com.codahale.metrics.Gauge gauge =
new SimpleBeamGauge(cc.getAttempted().getValue());
try {
String name = metricName(cc.getKey(), addStepName, addNamespace);
if (registry.getNames().contains(name)) {
LOGGER.info("Removing metric {}", name);
registry.remove(name);
}
registry.register(name, gauge);
} catch (IllegalArgumentException e) {
LOGGER.warn("Duplicated metrics found. Try turning on 
addStepName=true.", e);
}
}

As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>




Re: Metrics in Beam+Spark

2022-07-14 Thread Yushu Yao
Thanks Mortiz!

We are using jmx to ship the metrics out of spark. Most of the spark
built-in driver and executor metrics are going out fine.
Does this require us to make another sink?

-Yushu


On Thu, Jul 14, 2022 at 1:05 PM Moritz Mack  wrote:

> Hi Yushu,
>
>
>
> Wondering, how did you configure your Spark metrics sink? And what version
> of Spark are you using?
>
>
>
> Key is to configure Spark to use one of the sinks provided by Beam, e.g.:
>
>
> "spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink"
>
>
>
> Currently there’s support for CSV and Graphite, but it’s simple to support
> others. The sinks typically wrap the corresponding Spark sinks and
> integrate with the Metrics registry of the Spark metric system.
>
>
>
>
> https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html
>
>
>
> When dealing with custom sinks, it’s fairly common to run into classpath
> issues :/ The metric system is loaded when an executor starts up, by then
> the application classpath isn’t available yet. Typically, that means
> distributing such code by other means.
>
>
>
> /Moritz
>
>
>
>
>
> On 14.07.22, 21:26, "Yushu Yao"  wrote:
>
>
>
> Hi Team,  Does anyone have a working example of a beam job running on top
> of spark? So that I can use the beam metric syntax and the metrics will be
> shipped out via spark's infra?  The only thing I achieved is to be able to
> queryMetrics()
>
> ZjQcmQRYFpfptBannerStart
>
> *This Message Is From an External Sender *
>
> This message came from outside your organization.
>
> Exercise caution when opening attachments or clicking any links.
>
> ZjQcmQRYFpfptBannerEnd
>
> Hi Team,
>
> Does anyone have a working example of a beam job running on top of spark?
> So that I can use the beam metric syntax and the metrics will be shipped
> out via spark's infra?
>
>
>
> The only thing I achieved is to be able to queryMetrics() every half
> second and copy all the metrics into the spark metrics.
>
> Wondering if there is a better way?
>
>
>
> Thanks!
>
> -Yushu
>
>
>
> MetricQueryResults metrics =
> pipelineResult
> .metrics()
> .queryMetrics(
> MetricsFilter.*builder*()
> .addNameFilter(MetricNameFilter.*inNamespace*(namespace))
> .build());
>
> for (MetricResult cc : metrics.getGauges()) {
> *LOGGER*.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
> com.codahale.metrics.Gauge gauge =
> new SimpleBeamGauge(cc.getAttempted().getValue());
> try {
> String name = metricName(cc.getKey(), addStepName, addNamespace);
> if (registry.getNames().contains(name)) {
> *LOGGER*.info("Removing metric {}", name);
> registry.remove(name);
> }
> registry.register(name, gauge);
> } catch (IllegalArgumentException e) {
> *LOGGER*.warn("Duplicated metrics found. Try turning on 
> addStepName=true.", e);
> }
> }
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice.
> <https://www.talend.com/privacy/>*
>
>
>


Re: Metrics in Beam+Spark

2022-07-14 Thread Moritz Mack
Hi Yushu,

Wondering, how did you configure your Spark metrics sink? And what version of 
Spark are you using?

Key is to configure Spark to use one of the sinks provided by Beam, e.g.:
"spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink"

Currently there’s support for CSV and Graphite, but it’s simple to support 
others. The sinks typically wrap the corresponding Spark sinks and integrate 
with the Metrics registry of the Spark metric system.

https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html

When dealing with custom sinks, it’s fairly common to run into classpath issues 
:/ The metric system is loaded when an executor starts up, by then the 
application classpath isn’t available yet. Typically, that means distributing 
such code by other means.

/Moritz


On 14.07.22, 21:26, "Yushu Yao"  wrote:

Hi Team,  Does anyone have a working example of a beam job running on top of 
spark? So that I can use the beam metric syntax and the metrics will be shipped 
out via spark's infra?  The only thing I achieved is to be able to 
queryMetrics()
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Team,
Does anyone have a working example of a beam job running on top of spark? So 
that I can use the beam metric syntax and the metrics will be shipped out via 
spark's infra?

The only thing I achieved is to be able to queryMetrics() every half second and 
copy all the metrics into the spark metrics.
Wondering if there is a better way?

Thanks!
-Yushu


MetricQueryResults metrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.inNamespace(namespace))
.build());

for (MetricResult cc : metrics.getGauges()) {
LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
com.codahale.metrics.Gauge gauge =
new SimpleBeamGauge(cc.getAttempted().getValue());
try {
String name = metricName(cc.getKey(), addStepName, addNamespace);
if (registry.getNames().contains(name)) {
LOGGER.info("Removing metric {}", name);
registry.remove(name);
}
registry.register(name, gauge);
} catch (IllegalArgumentException e) {
LOGGER.warn("Duplicated metrics found. Try turning on 
addStepName=true.", e);
}
}

As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>




Metrics in Beam+Spark

2022-07-14 Thread Yushu Yao
Hi Team,
Does anyone have a working example of a beam job running on top of spark?
So that I can use the beam metric syntax and the metrics will be shipped
out via spark's infra?

The only thing I achieved is to be able to queryMetrics() every half second
and copy all the metrics into the spark metrics.
Wondering if there is a better way?

Thanks!
-Yushu

MetricQueryResults metrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.inNamespace(namespace))
.build());

for (MetricResult cc : metrics.getGauges()) {
LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted());
com.codahale.metrics.Gauge gauge =
new SimpleBeamGauge(cc.getAttempted().getValue());
try {
String name = metricName(cc.getKey(), addStepName, addNamespace);
if (registry.getNames().contains(name)) {
LOGGER.info("Removing metric {}", name);
registry.remove(name);
}
registry.register(name, gauge);
} catch (IllegalArgumentException e) {
LOGGER.warn("Duplicated metrics found. Try turning on
addStepName=true.", e);
}
}


Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Moritz Mack
Hi Yushu,

Have a look at org.apache.beam.runners.spark.translation.EvaluationContext in 
the Spark runner. It maintains that mapping between PCollections and RDDs 
(wrapped in the BoundedDataset helper). As Reuven just pointed out, values are 
timestamped (and windowed) in Beam, therefore BoundedDataset expects a 
JavaRDD>.
The idea is to map your external RDD to a new PCollection 
(PCollection.createPrimitiveOutputInternal) in the EvaluationContext (and vice 
versa). You can then apply Beam transforms to that PCollection (and with that 
effectively to the mapped RDD) as you are used to. Obviously, there’s a few 
steps necessary as EvaluationContext isn’t easily accessible from the outside.

Just having a quick look, creating a RddSource for Spark RDDs seems also not 
too bad. That would allow you to do something like this:
pipeline.apply(Read.from(new RddSource<>(javaRdd.rdd(), coder)));

Though I haven’t done much testing beyond a quick experiment. One notable 
disadvantage of that approach Is that all RDD partition data must be 
broadcasted to all workers to then pick the right partition. This should mostly 
be fine, but some types of partitions carry data as well …
https://gist.github.com/mosche/5c1ef8ba281a9a08df1ec67fac700d03

/Moritz

On 24.05.22, 16:46, "Yushu Yao"  wrote:

Looks like it's a valid use case. Wondering anyone can give some high level 
guidelines on how to implement this? I can give it a try. -Yushu On Tue, May 
24, 2022 at 2:42 AM Jan Lukavský  wrote: ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ 
‍ ‍ ‍ ‍ ‍ ‍ ‍
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Looks like it's a valid use case.
Wondering anyone can give some high level guidelines on how to implement this?
I can give it a try.

-Yushu


On Tue, May 24, 2022 at 2:42 AM Jan Lukavský 
mailto:je...@seznam.cz>> wrote:

+dev@beam<mailto:d...@beam.apache.org>
On 5/24/22 11:40, Jan Lukavský wrote:

Hi,
I think this feature is valid. Every runner for which Beam is not a 'native' 
SDK uses some form of translation context, which maps PCollection to internal 
representation of the particular SDK of the runner (RDD in this case). It 
should be possible to "import" an RDD into the specific runner via something 
like

  SparkRunner runner = ;
  PCollection<...> pCollection = runner.importRDD(rdd);

and similarly

  RDD<...> rdd = runner.exportRDD(pCollection);

Yes, apparently this would be runner specific, but that is the point, actually. 
This would enable using features and libraries, that Beam does not have, or 
micro-optimize some particular step using runner-specific features, that we 
don't have in Beam. We actually had this feature (at least in a prototype) many 
years ago when Euphoria was a separate project.

 Jan
On 5/23/22 20:58, Alexey Romanenko wrote:



On 23 May 2022, at 20:40, Brian Hulette 
mailto:bhule...@google.com>> wrote:

Yeah I'm not sure of any simple way to do this. I wonder if it's worth 
considering building some Spark runner-specific feature around this, or at 
least packaging up Robert's proposed solution?

I’m not sure that a runner specific feature is a good way to do this since the 
other runners won’t be able to support it or I’m missing something?

There could be other interesting integrations in this space too, e.g. using 
Spark RDDs as a cache for Interactive Beam.

Another option could be to add something like SparkIO (or FlinkIO/whatever) to 
read/write data from/to Spark data structures for such cases (Spark schema to 
Beam schema convention also could be supported). And dreaming a bit more, for 
those who need to have a mixed pipeline (e.g. Spark + Beam) such connectors 
could support the push-downs of pure Spark pipelines and then use the result 
downstream in Beam.

—
Alexey




Brian

On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw 
mailto:rober...@google.com>> wrote:
The easiest way to do this would be to write the RDD somewhere then
read it from Beam.

On Mon, May 23, 2022 at 9:39 AM Yushu Yao 
mailto:yao.yu...@gmail.com>> wrote:
>
> Hi Folks,
>
> I know this is not the optimal way to use beam :-) But assume I only use the 
> spark runner.
>
> I have a spark library (very complex) that emits a spark dataframe (or RDD).
> I also have an existing complex beam pipeline that can do post processing on 
> the data inside the dataframe.
>
> However, the beam part needs a pcollection to start with. The question is, 
> how can I convert a spark RDD into a pcollection?
>
> Thanks
> -Yushu
>


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>




Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Jan Lukavský
Yes, I suppose it might be more complex than the code snippet, that was 
just to demonstrate the idea. Also the "exportRDD" would probably return 
WindowedValue instead of plain T.


On 5/24/22 17:23, Reuven Lax wrote:
Something like this seems reasonable. Beam PCollections also have a 
timestamp associated with every element, so the importRDD function 
probably needs a way to specify the timestamp (could be an attribute 
name for dataframes or a timestamp extraction function for regular RDDs).


On Tue, May 24, 2022 at 2:40 AM Jan Lukavský  wrote:

Hi,
I think this feature is valid. Every runner for which Beam is not
a 'native' SDK uses some form of translation context, which maps
PCollection to internal representation of the particular SDK of
the runner (RDD in this case). It should be possible to "import"
an RDD into the specific runner via something like

  SparkRunner runner = ;
  PCollection<...> pCollection = runner.importRDD(rdd);

and similarly

  RDD<...> rdd = runner.exportRDD(pCollection);

Yes, apparently this would be runner specific, but that is the
point, actually. This would enable using features and libraries,
that Beam does not have, or micro-optimize some particular step
using runner-specific features, that we don't have in Beam. We
actually had this feature (at least in a prototype) many years ago
when Euphoria was a separate project.

 Jan

On 5/23/22 20:58, Alexey Romanenko wrote:




On 23 May 2022, at 20:40, Brian Hulette  wrote:

Yeah I'm not sure of any simple way to do this. I wonder if it's
worth considering building some Spark runner-specific feature
around this, or at least packaging up Robert's proposed solution?


I’m not sure that a runner specific feature is a good way to do
this since the other runners won’t be able to support it or I’m
missing something?


There could be other interesting integrations in this space too,
e.g. using Spark RDDs as a cache for Interactive Beam.


Another option could be to add something like SparkIO (or
FlinkIO/whatever) to read/write data from/to Spark data
structures for such cases (Spark schema to Beam schema convention
also could be supported). And dreaming a bit more, for those who
need to have a mixed pipeline (e.g. Spark + Beam) such connectors
could support the push-downs of pure Spark pipelines and then use
the result downstream in Beam.

—
Alexey




Brian

On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw
 wrote:

The easiest way to do this would be to write the RDD
somewhere then
read it from Beam.

On Mon, May 23, 2022 at 9:39 AM Yushu Yao
 wrote:
>
> Hi Folks,
>
> I know this is not the optimal way to use beam :-) But
assume I only use the spark runner.
>
> I have a spark library (very complex) that emits a spark
dataframe (or RDD).
> I also have an existing complex beam pipeline that can do
post processing on the data inside the dataframe.
>
> However, the beam part needs a pcollection to start with.
The question is, how can I convert a spark RDD into a
pcollection?
>
> Thanks
> -Yushu
>



Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Yushu Yao
Looks like it's a valid use case.
Wondering anyone can give some high level guidelines on how to implement
this?
I can give it a try.

-Yushu


On Tue, May 24, 2022 at 2:42 AM Jan Lukavský  wrote:

> +dev@beam 
> On 5/24/22 11:40, Jan Lukavský wrote:
>
> Hi,
> I think this feature is valid. Every runner for which Beam is not a
> 'native' SDK uses some form of translation context, which maps PCollection
> to internal representation of the particular SDK of the runner (RDD in this
> case). It should be possible to "import" an RDD into the specific runner
> via something like
>
>   SparkRunner runner = ;
>   PCollection<...> pCollection = runner.importRDD(rdd);
>
> and similarly
>
>   RDD<...> rdd = runner.exportRDD(pCollection);
>
> Yes, apparently this would be runner specific, but that is the point,
> actually. This would enable using features and libraries, that Beam does
> not have, or micro-optimize some particular step using runner-specific
> features, that we don't have in Beam. We actually had this feature (at
> least in a prototype) many years ago when Euphoria was a separate project.
>
>  Jan
> On 5/23/22 20:58, Alexey Romanenko wrote:
>
>
>
> On 23 May 2022, at 20:40, Brian Hulette  wrote:
>
> Yeah I'm not sure of any simple way to do this. I wonder if it's worth
> considering building some Spark runner-specific feature around this, or at
> least packaging up Robert's proposed solution?
>
>
> I’m not sure that a runner specific feature is a good way to do this since
> the other runners won’t be able to support it or I’m missing something?
>
> There could be other interesting integrations in this space too, e.g.
> using Spark RDDs as a cache for Interactive Beam.
>
>
> Another option could be to add something like SparkIO (or
> FlinkIO/whatever) to read/write data from/to Spark data structures for such
> cases (Spark schema to Beam schema convention also could be supported). And
> dreaming a bit more, for those who need to have a mixed pipeline (e.g.
> Spark + Beam) such connectors could support the push-downs of pure Spark
> pipelines and then use the result downstream in Beam.
>
> —
> Alexey
>
>
>
> Brian
>
> On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw 
> wrote:
>
>> The easiest way to do this would be to write the RDD somewhere then
>> read it from Beam.
>>
>> On Mon, May 23, 2022 at 9:39 AM Yushu Yao  wrote:
>> >
>> > Hi Folks,
>> >
>> > I know this is not the optimal way to use beam :-) But assume I only
>> use the spark runner.
>> >
>> > I have a spark library (very complex) that emits a spark dataframe (or
>> RDD).
>> > I also have an existing complex beam pipeline that can do post
>> processing on the data inside the dataframe.
>> >
>> > However, the beam part needs a pcollection to start with. The question
>> is, how can I convert a spark RDD into a pcollection?
>> >
>> > Thanks
>> > -Yushu
>> >
>>
>
>


Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Jan Lukavský

+dev@beam <mailto:d...@beam.apache.org>

On 5/24/22 11:40, Jan Lukavský wrote:


Hi,
I think this feature is valid. Every runner for which Beam is not a 
'native' SDK uses some form of translation context, which maps 
PCollection to internal representation of the particular SDK of the 
runner (RDD in this case). It should be possible to "import" an RDD 
into the specific runner via something like


  SparkRunner runner = ;
  PCollection<...> pCollection = runner.importRDD(rdd);

and similarly

  RDD<...> rdd = runner.exportRDD(pCollection);

Yes, apparently this would be runner specific, but that is the point, 
actually. This would enable using features and libraries, that Beam 
does not have, or micro-optimize some particular step using 
runner-specific features, that we don't have in Beam. We actually had 
this feature (at least in a prototype) many years ago when Euphoria 
was a separate project.


 Jan

On 5/23/22 20:58, Alexey Romanenko wrote:




On 23 May 2022, at 20:40, Brian Hulette  wrote:

Yeah I'm not sure of any simple way to do this. I wonder if it's 
worth considering building some Spark runner-specific feature around 
this, or at least packaging up Robert's proposed solution?


I’m not sure that a runner specific feature is a good way to do this 
since the other runners won’t be able to support it or I’m missing 
something?


There could be other interesting integrations in this space too, 
e.g. using Spark RDDs as a cache for Interactive Beam.


Another option could be to add something like SparkIO (or 
FlinkIO/whatever) to read/write data from/to Spark data structures 
for such cases (Spark schema to Beam schema convention also could be 
supported). And dreaming a bit more, for those who need to have a 
mixed pipeline (e.g. Spark + Beam) such connectors could support the 
push-downs of pure Spark pipelines and then use the result downstream 
in Beam.


—
Alexey




Brian

On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw 
 wrote:


The easiest way to do this would be to write the RDD somewhere then
read it from Beam.

On Mon, May 23, 2022 at 9:39 AM Yushu Yao 
wrote:
>
> Hi Folks,
>
> I know this is not the optimal way to use beam :-) But assume
I only use the spark runner.
>
> I have a spark library (very complex) that emits a spark
dataframe (or RDD).
> I also have an existing complex beam pipeline that can do post
processing on the data inside the dataframe.
>
> However, the beam part needs a pcollection to start with. The
question is, how can I convert a spark RDD into a pcollection?
>
> Thanks
> -Yushu
>



Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Jan Lukavský

Hi,
I think this feature is valid. Every runner for which Beam is not a 
'native' SDK uses some form of translation context, which maps 
PCollection to internal representation of the particular SDK of the 
runner (RDD in this case). It should be possible to "import" an RDD into 
the specific runner via something like


  SparkRunner runner = ;
  PCollection<...> pCollection = runner.importRDD(rdd);

and similarly

  RDD<...> rdd = runner.exportRDD(pCollection);

Yes, apparently this would be runner specific, but that is the point, 
actually. This would enable using features and libraries, that Beam does 
not have, or micro-optimize some particular step using runner-specific 
features, that we don't have in Beam. We actually had this feature (at 
least in a prototype) many years ago when Euphoria was a separate project.


 Jan

On 5/23/22 20:58, Alexey Romanenko wrote:




On 23 May 2022, at 20:40, Brian Hulette  wrote:

Yeah I'm not sure of any simple way to do this. I wonder if it's 
worth considering building some Spark runner-specific feature around 
this, or at least packaging up Robert's proposed solution?


I’m not sure that a runner specific feature is a good way to do this 
since the other runners won’t be able to support it or I’m missing 
something?


There could be other interesting integrations in this space too, e.g. 
using Spark RDDs as a cache for Interactive Beam.


Another option could be to add something like SparkIO (or 
FlinkIO/whatever) to read/write data from/to Spark data structures for 
such cases (Spark schema to Beam schema convention also could be 
supported). And dreaming a bit more, for those who need to have a 
mixed pipeline (e.g. Spark + Beam) such connectors could support the 
push-downs of pure Spark pipelines and then use the result downstream 
in Beam.


—
Alexey




Brian

On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw 
 wrote:


The easiest way to do this would be to write the RDD somewhere then
read it from Beam.

On Mon, May 23, 2022 at 9:39 AM Yushu Yao 
wrote:
>
> Hi Folks,
>
> I know this is not the optimal way to use beam :-) But assume I
only use the spark runner.
>
> I have a spark library (very complex) that emits a spark
dataframe (or RDD).
> I also have an existing complex beam pipeline that can do post
processing on the data inside the dataframe.
>
> However, the beam part needs a pcollection to start with. The
question is, how can I convert a spark RDD into a pcollection?
>
> Thanks
> -Yushu
>



Re: RDD (Spark dataframe) into a PCollection?

2022-05-23 Thread Alexey Romanenko


> On 23 May 2022, at 20:40, Brian Hulette  wrote:
> 
> Yeah I'm not sure of any simple way to do this. I wonder if it's worth 
> considering building some Spark runner-specific feature around this, or at 
> least packaging up Robert's proposed solution? 

I’m not sure that a runner specific feature is a good way to do this since the 
other runners won’t be able to support it or I’m missing something?

> There could be other interesting integrations in this space too, e.g. using 
> Spark RDDs as a cache for Interactive Beam.

Another option could be to add something like SparkIO (or FlinkIO/whatever) to 
read/write data from/to Spark data structures for such cases (Spark schema to 
Beam schema convention also could be supported). And dreaming a bit more, for 
those who need to have a mixed pipeline (e.g. Spark + Beam) such connectors 
could support the push-downs of pure Spark pipelines and then use the result 
downstream in Beam.

—
Alexey


> 
> Brian
> 
> On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw  <mailto:rober...@google.com>> wrote:
> The easiest way to do this would be to write the RDD somewhere then
> read it from Beam.
> 
> On Mon, May 23, 2022 at 9:39 AM Yushu Yao  <mailto:yao.yu...@gmail.com>> wrote:
> >
> > Hi Folks,
> >
> > I know this is not the optimal way to use beam :-) But assume I only use 
> > the spark runner.
> >
> > I have a spark library (very complex) that emits a spark dataframe (or RDD).
> > I also have an existing complex beam pipeline that can do post processing 
> > on the data inside the dataframe.
> >
> > However, the beam part needs a pcollection to start with. The question is, 
> > how can I convert a spark RDD into a pcollection?
> >
> > Thanks
> > -Yushu
> >



Re: RDD (Spark dataframe) into a PCollection?

2022-05-23 Thread Yushu Yao
Thanks Robert and Brian.
As for "writing the RDD somewhere", I can totally write a bunch of files on
disk/s3. Any other options?
-Yushu


On Mon, May 23, 2022 at 11:40 AM Brian Hulette  wrote:

> Yeah I'm not sure of any simple way to do this. I wonder if it's worth
> considering building some Spark runner-specific feature around this, or at
> least packaging up Robert's proposed solution?
>
> There could be other interesting integrations in this space too, e.g.
> using Spark RDDs as a cache for Interactive Beam.
>
> Brian
>
> On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw 
> wrote:
>
>> The easiest way to do this would be to write the RDD somewhere then
>> read it from Beam.
>>
>> On Mon, May 23, 2022 at 9:39 AM Yushu Yao  wrote:
>> >
>> > Hi Folks,
>> >
>> > I know this is not the optimal way to use beam :-) But assume I only
>> use the spark runner.
>> >
>> > I have a spark library (very complex) that emits a spark dataframe (or
>> RDD).
>> > I also have an existing complex beam pipeline that can do post
>> processing on the data inside the dataframe.
>> >
>> > However, the beam part needs a pcollection to start with. The question
>> is, how can I convert a spark RDD into a pcollection?
>> >
>> > Thanks
>> > -Yushu
>> >
>>
>


Re: RDD (Spark dataframe) into a PCollection?

2022-05-23 Thread Alexey Romanenko
To add a bit more to what Robert suggested. Right, in general we can’t read 
Spark RDD directly with Beam (Spark runner uses RDD under the hood but it’s a 
different story) but you can write the results to any storage and in data 
format that Beam supports and then read it with a corespondent Beam IO 
connector.

—
Alexey

> On 23 May 2022, at 20:35, Robert Bradshaw  wrote:
> 
> The easiest way to do this would be to write the RDD somewhere then
> read it from Beam.
> 
> On Mon, May 23, 2022 at 9:39 AM Yushu Yao  wrote:
>> 
>> Hi Folks,
>> 
>> I know this is not the optimal way to use beam :-) But assume I only use the 
>> spark runner.
>> 
>> I have a spark library (very complex) that emits a spark dataframe (or RDD).
>> I also have an existing complex beam pipeline that can do post processing on 
>> the data inside the dataframe.
>> 
>> However, the beam part needs a pcollection to start with. The question is, 
>> how can I convert a spark RDD into a pcollection?
>> 
>> Thanks
>> -Yushu
>> 



RDD (Spark dataframe) into a PCollection?

2022-05-23 Thread Yushu Yao
Hi Folks,

I know this is not the optimal way to use beam :-) But assume I only use
the spark runner.

I have a spark library (very complex) that emits a spark dataframe (or
RDD).
I also have an existing complex beam pipeline that can do post processing
on the data inside the dataframe.

However, the beam part needs a pcollection to start with. The question is,
how can I convert a spark RDD into a pcollection?

Thanks
-Yushu


Re: [PROPOSAL] Stop Spark 2 support in Spark Runner

2022-04-29 Thread Austin Bennett
https://spark.apache.org/releases/spark-release-3-0-0.html

Since Spark 3 has been out almost 2 years, this seems increasingly
reasonable.

On Fri, Apr 29, 2022 at 4:04 AM Jean-Baptiste Onofré 
wrote:

> +1, it makes sense to me. Users wanting "old" spark version can take
> previous Beam releases.
>
> Regards
> JB
>
> On Fri, Apr 29, 2022 at 12:39 PM Alexey Romanenko
>  wrote:
> >
> > Any objections or comments from Spark 2 users on this topic?
> >
> > —
> > Alexey
> >
> >
> > On 20 Apr 2022, at 19:17, Alexey Romanenko 
> wrote:
> >
> > Hi everyone,
> >
> > A while ago, we already discussed on dev@ that there are several
> reasons to stop provide a support of Spark2 in Spark Runner (in all its
> variants that we have for now - RDD, Dataset, Portable) [1]. In two words,
> it brings some burden to Spark runner support that we would like to avoid
> in the future.
> >
> > From the devs perspective I don’t see any objections about this. So, I’d
> like to know if there are users that still uses Spark2 for their Beam
> pipelines and it will be critical for them to keep using it.
> >
> > Please, share any your opinion on this!
> >
> > —
> > Alexey
> >
> > [1] https://lists.apache.org/thread/opfhg3xjb9nptv878sygwj9gjx38rmco
> >
> > > On 31 Mar 2022, at 17:51, Alexey Romanenko 
> wrote:
> > >
> > > Hi everyone,
> > >
> > > For the moment, Beam Spark Runner supports two versions of Spark - 2.x
> and 3.x.
> > >
> > > Taking into account the several things that:
> > > - almost all cloud providers already mostly moved to Spark 3.x as a
> main supported version;
> > > - the latest Spark 2.x release (Spark 2.4.8, maintenance release) was
> done almost a year ago;
> > > - Spark 3 is considered as a mainstream Spark version for development
> and bug fixing;
> > > - better to avoid the burden of maintenance (there are some
> incompatibilities between Spark 2 and 3) of two versions;
> > >
> > > I’d suggest to stop support Spark 2 for the Spark Runner in the one of
> the next Beam releases.
> > >
> > > What are your thoughts on this? Are there any principal objections or
> reasons for not doing this that I probably missed?
> > >
> > > —
> > > Alexey
> > >
> > >
>


Re: [PROPOSAL] Stop Spark 2 support in Spark Runner

2022-04-29 Thread Jean-Baptiste Onofré
+1, it makes sense to me. Users wanting "old" spark version can take
previous Beam releases.

Regards
JB

On Fri, Apr 29, 2022 at 12:39 PM Alexey Romanenko
 wrote:
>
> Any objections or comments from Spark 2 users on this topic?
>
> —
> Alexey
>
>
> On 20 Apr 2022, at 19:17, Alexey Romanenko  wrote:
>
> Hi everyone,
>
> A while ago, we already discussed on dev@ that there are several reasons to 
> stop provide a support of Spark2 in Spark Runner (in all its variants that we 
> have for now - RDD, Dataset, Portable) [1]. In two words, it brings some 
> burden to Spark runner support that we would like to avoid in the future.
>
> From the devs perspective I don’t see any objections about this. So, I’d like 
> to know if there are users that still uses Spark2 for their Beam pipelines 
> and it will be critical for them to keep using it.
>
> Please, share any your opinion on this!
>
> —
> Alexey
>
> [1] https://lists.apache.org/thread/opfhg3xjb9nptv878sygwj9gjx38rmco
>
> > On 31 Mar 2022, at 17:51, Alexey Romanenko  wrote:
> >
> > Hi everyone,
> >
> > For the moment, Beam Spark Runner supports two versions of Spark - 2.x and 
> > 3.x.
> >
> > Taking into account the several things that:
> > - almost all cloud providers already mostly moved to Spark 3.x as a main 
> > supported version;
> > - the latest Spark 2.x release (Spark 2.4.8, maintenance release) was done 
> > almost a year ago;
> > - Spark 3 is considered as a mainstream Spark version for development and 
> > bug fixing;
> > - better to avoid the burden of maintenance (there are some 
> > incompatibilities between Spark 2 and 3) of two versions;
> >
> > I’d suggest to stop support Spark 2 for the Spark Runner in the one of the 
> > next Beam releases.
> >
> > What are your thoughts on this? Are there any principal objections or 
> > reasons for not doing this that I probably missed?
> >
> > —
> > Alexey
> >
> >


[PROPOSAL] Stop Spark 2 support in Spark Runner

2022-04-29 Thread Alexey Romanenko
Any objections or comments from Spark 2 users on this topic?

—
Alexey


On 20 Apr 2022, at 19:17, Alexey Romanenko  wrote:

Hi everyone,

A while ago, we already discussed on dev@ that there are several reasons to 
stop provide a support of Spark2 in Spark Runner (in all its variants that we 
have for now - RDD, Dataset, Portable) [1]. In two words, it brings some burden 
to Spark runner support that we would like to avoid in the future.

From the devs perspective I don’t see any objections about this. So, I’d like 
to know if there are users that still uses Spark2 for their Beam pipelines and 
it will be critical for them to keep using it. 

Please, share any your opinion on this!

—
Alexey

[1] https://lists.apache.org/thread/opfhg3xjb9nptv878sygwj9gjx38rmco

> On 31 Mar 2022, at 17:51, Alexey Romanenko  wrote:
> 
> Hi everyone,
> 
> For the moment, Beam Spark Runner supports two versions of Spark - 2.x and 
> 3.x. 
> 
> Taking into account the several things that:
> - almost all cloud providers already mostly moved to Spark 3.x as a main 
> supported version;
> - the latest Spark 2.x release (Spark 2.4.8, maintenance release) was done 
> almost a year ago;
> - Spark 3 is considered as a mainstream Spark version for development and bug 
> fixing;
> - better to avoid the burden of maintenance (there are some incompatibilities 
> between Spark 2 and 3) of two versions; 
> 
> I’d suggest to stop support Spark 2 for the Spark Runner in the one of the 
> next Beam releases. 
> 
> What are your thoughts on this? Are there any principal objections or reasons 
> for not doing this that I probably missed?
> 
> —
> Alexey 
> 
> 


Re: [PROPOSAL] Stop Spark2 support in Spark Runner

2022-04-20 Thread Alexey Romanenko
Hi everyone,

A while ago, we already discussed on dev@ that there are several reasons to 
stop provide a support of Spark2 in Spark Runner (in all its variants that we 
have for now - RDD, Dataset, Portable) [1]. In two words, it brings some burden 
to Spark runner support that we would like to avoid in the future.

From the devs perspective I don’t see any objections about this. So, I’d like 
to know if there are users that still uses Spark2 for their Beam pipelines and 
it will be critical for them to keep using it. 

Please, share any your opinion on this!

—
Alexey

[1] https://lists.apache.org/thread/opfhg3xjb9nptv878sygwj9gjx38rmco

> On 31 Mar 2022, at 17:51, Alexey Romanenko  wrote:
> 
> Hi everyone,
> 
> For the moment, Beam Spark Runner supports two versions of Spark - 2.x and 
> 3.x. 
> 
> Taking into account the several things that:
> - almost all cloud providers already mostly moved to Spark 3.x as a main 
> supported version;
> - the latest Spark 2.x release (Spark 2.4.8, maintenance release) was done 
> almost a year ago;
> - Spark 3 is considered as a mainstream Spark version for development and bug 
> fixing;
> - better to avoid the burden of maintenance (there are some incompatibilities 
> between Spark 2 and 3) of two versions; 
> 
> I’d suggest to stop support Spark 2 for the Spark Runner in the one of the 
> next Beam releases. 
> 
> What are your thoughts on this? Are there any principal objections or reasons 
> for not doing this that I probably missed?
> 
> —
> Alexey 
> 
> 



Re: Re: [Question] Spark: standard setup to use beam-spark to parallelize python code

2022-04-04 Thread Florian Pinault
We made some progress to parallelize our python code using beam-spark. 
Following your advice, we are using spark 3.2.1
The spark server and worker are connected ok.
In a third machine, the client machine, I am running the docker jobserver:
$ sudo docker run --net=host apache/beam_spark_job_server:latest 
--spark-master-url=spark://:7077

Then on the client:
$ python test_beam.py

If it matters, the code in test_beam.py has the following:
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--save_main_session",
"--environment_type=DOCKER",
"--environment_config=docker.io/apache/beam_python3.8_sdk:2.37.0"
])
with beam.Pipeline(options=options) as p:
lines = (p
| 'Create words' >> beam.Create(['this is working'])
| 'Add hostname' >> beam.Map(lambda words: addhost(words))
| 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
| 'Build byte array' >> beam.ParDo(ConvertToByteArray())
| 'Group' >> beam.GroupBy() # Do future batching here
| 'print output' >> beam.Map(myprint)
)


I think I got the versions wrong because the the server logs gives 
(192.168.1.252 is the client IP):

22/04/04 11:51:57 DEBUG TransportServer: New connection accepted for remote 
address /192.168.1.252:53330.
22/04/04 11:51:57 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() for one-way message.
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; 
local class incompatible: stream classdesc serialVersionUID = 
6543101073799644159, local class serialVersionUID = 1574364215946805297
….


On the client logs, I got:

22/04/04 11:51:56 INFO 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Staging 
artifacts for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.
22/04/04 11:51:56 INFO 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Resolving 
artifacts for 
job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.ref_Environment_default_environment_1.
22/04/04 11:51:56 INFO 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Getting 1 
artifacts for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.null.
22/04/04 11:51:56 INFO 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Artifacts 
fully staged for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.
22/04/04 11:51:56 INFO org.apache.beam.runners.spark.SparkJobInvoker: Invoking 
job BeamApp-root-0404115156-3cc5f493_e0deca3a-6b67-40ad-bfe8-55ba7efd9038
22/04/04 11:51:56 INFO org.apache.beam.runners.jobsubmission.JobInvocation: 
Starting job invocation 
BeamApp-root-0404115156-3cc5f493_e0deca3a-6b67-40ad-bfe8-55ba7efd9038
22/04/04 11:51:56 INFO 
org.apache.beam.runners.core.construction.resources.PipelineResources: 
PipelineOptions.filesToStage was not specified. Defaulting to files from the 
classpath: will stage 6 files. Enable logging at DEBUG level to see which files 
will be staged.
22/04/04 11:51:56 INFO 
org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a brand 
new Spark Context.
22/04/04 11:51:56 WARN org.apache.spark.util.Utils: Your hostname, 
spark-ml-client resolves to a loopback address: 127.0.0.1; using 192.168.1.252 
instead (on interface eth0)
22/04/04 11:51:56 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you 
need to bind to another address
22/04/04 11:51:57 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
22/04/04 11:52:57 ERROR 
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application has 
been killed. Reason: All masters are unresponsive! Giving up.
22/04/04 11:52:57 WARN 
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application ID 
is not initialized yet.
22/04/04 11:52:57 WARN 
org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint: Drop 
UnregisterApplication(null) because has not yet connected to master
22/04/04 11:52:57 WARN org.apache.spark.metrics.MetricsSystem: Stopping a 
MetricsSystem that is not running
22/04/04 11:52:57 ERROR org.apache.spark.SparkContext: Error initializing 
SparkContext.
java.lang.NullPointerException
at 
org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:64)
at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:248)
at org.apache.spark.SparkContext.(SparkContext.scala:510)
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
at 
org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:101)
at 
org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:67)

Re: [Question] Spark: standard setup to use beam-spark to parallelize python code

2022-03-28 Thread Alexey Romanenko

> On 28 Mar 2022, at 20:58, Mihai Alexe  wrote:
> 
> the jackson runtime dependencies should be updated manually (at least to 
> 2.9.2) in case of using Spark 2.x
>  
> yes - that is exactly what we are looking to achieve, any hints about how to 
> do that? We’re not Java experts. Do you happen to have a CI recipe or binary 
> lis for this particular configuration? Thank you!

For our testing pipelines, that run on Spark 2.4.7, we just build them with a 
recent version of jackson libs [1]. Though, iiuc, you don’t build any java 
code. So, what actually is an issue that you are facing? 

> use Spark 3..x if possible since it already provides jackson jars of version 
> 2.10.0.
>  
> we tried this too but ran into other compatibility problems. Seems that the 
> Beam Spark runner (in v 2.37.0) only supports the Spark 2.x branch, as per 
> the Beam docs https://beam.apache.org/documentation/runners/spark/ 
> <https://beam.apache.org/documentation/runners/spark/>
Which exactly Spark 3.x version you did try? AFAICT, Beam 2.37.0 supports and 
was tested with Spark 3.1.2 / Scala 2.12 artifacts.

—
Alexey

[1] 
https://github.com/Talend/beam-samples/blob/9288606495b9ba8f77383cd9709ed9b5783deeb8/pom.xml#L66

>  
> any ideas?
>  
> On 2022/03/28 17:38:13 Alexey Romanenko wrote:
> > Well, it’s caused by recent jackson's version update in Beam [1] - so, the 
> > jackson runtime dependencies should be updated manually (at least to 2.9.2) 
> > in case of using Spark 2.x.
> > 
> > Either, use Spark 3..x if possible since it already provides jackson jars 
> > of version 2.10.0.
> >  
> > [1] 
> > https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c
> >  
> > <https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c><https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c>
> >  
> > <https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c%3e>
> > 
> > —
> > Alexey
> > 
> > > On 28 Mar 2022, at 14:15, Florian Pinault  > > <mailto:fl...@ecmwf.int>> wrote:
> > > 
> > > Greetings,
> > >  
> > > We are setting up an Apache Beam cluster using Spark as a backend to run 
> > > python code. This is currently a toy example with 4 virtual machines 
> > > running Centos (a client, a spark main, and two spark-workers).
> > > We are running into version issues (detail below) and would need help on 
> > > which versions to set up.
> > > We currently are trying spark-2.4.8-bin-hadoop2.7, with the pip package 
> > > beam 2.37.0 on the client, and using a job-server to create docker image.
> > >  
> > > I saw here https://beam.apache.org/blog/beam-2.33.0/ 
> > > <https://beam.apache.org/blog/beam-2.33.0/> 
> > > <https://beam.apache.org/blog/beam-2.33.0/> 
> > > <https://beam.apache.org/blog/beam-2.33.0/%3e> that "Spark 2.x users will 
> > > need to update Spark's Jackson runtime dependencies 
> > > (spark.jackson.version) to at least version 2.9.2, due to Beam updating 
> > > its dependencies." 
> > >  But it looks like the jackson-core version in the job-server is 2.13.0 
> > > whereas the jars in spark-2.4.8-bin-hadoop2.7/jars are
> > > -. 1 mluser mluser 46986 May 8 2021 jackson-annotations-2.6.7.jar
> > > -. 1 mluser mluser 258919 May 8 2021 jackson-core-2.6.7.jar
> > > -. 1 mluser mluser 232248 May 8 2021 jackson-core-asl-1.9.13.jar
> > > -. 1 mluser mluser 1166637 May 8 2021 jackson-databind-2.6.7.3.jar
> > > -. 1 mluser mluser 320444 May 8 2021 jackson-dataformat-yaml-2.6.7.jar
> > > -. 1 mluser mluser 18336 May 8 2021 jackson-jaxrs-1.9.13.jar
> > > -. 1 mluser mluser 780664 May 8 2021 jackson-mapper-asl-1.9.13.jar
> > > -. 1 mluser mluser 32612 May 8 2021 
> > > jackson-module-jaxb-annotations-2.6.7.jar
> > > -. 1 mluser mluser 42858 May 8 2021 jackson-module-paranamer-2.7.9.jar
> > > -. 1 mluser mluser 515645 May 8 2021 jackson-module-scala_2.11-2.6.7.1.jar
> > >  
> > > There must be something to update, but I am not sure how to update these 
> > > jar files with their dependencies, and not sure if this would get us very 
> > > far.
> > >  
> > > Would you have a list of binaries that work together or some running CI 
> > > from the apache foundation similar to what we are trying to achieve?
> > 
> > 



RE: Re: [Question] Spark: standard setup to use beam-spark to parallelize python code

2022-03-28 Thread Mihai Alexe
  *   the jackson runtime dependencies should be updated manually (at least to 
2.9.2) in case of using Spark 2.x

yes - that is exactly what we are looking to achieve, any hints about how to do 
that? We’re not Java experts. Do you happen to have a CI recipe or binary lis 
for this particular configuration? Thank you!


  *   use Spark 3..x if possible since it already provides jackson jars of 
version 2.10.0.

we tried this too but ran into other compatibility problems. Seems that the 
Beam Spark runner (in v 2.37.0) only supports the Spark 2.x branch, as per the 
Beam docs https://beam.apache.org/documentation/runners/spark/

any ideas?

On 2022/03/28 17:38:13 Alexey Romanenko wrote:
> Well, it’s caused by recent jackson's version update in Beam [1] - so, the 
> jackson runtime dependencies should be updated manually (at least to 2.9.2) 
> in case of using Spark 2.x.
>
> Either, use Spark 3..x if possible since it already provides jackson jars of 
> version 2.10.0.
>
> [1] 
> https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c
>  
> <https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c><https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c%3e>
>
> —
> Alexey
>
> > On 28 Mar 2022, at 14:15, Florian Pinault 
> > mailto:fl...@ecmwf.int>> wrote:
> >
> > Greetings,
> >
> > We are setting up an Apache Beam cluster using Spark as a backend to run 
> > python code. This is currently a toy example with 4 virtual machines 
> > running Centos (a client, a spark main, and two spark-workers).
> > We are running into version issues (detail below) and would need help on 
> > which versions to set up.
> > We currently are trying spark-2.4.8-bin-hadoop2.7, with the pip package 
> > beam 2.37.0 on the client, and using a job-server to create docker image.
> >
> > I saw here https://beam.apache.org/blog/beam-2.33.0/ 
> > <https://beam.apache.org/blog/beam-2.33.0/><https://beam.apache.org/blog/beam-2.33.0/%3e>
> >  that "Spark 2.x users will need to update Spark's Jackson runtime 
> > dependencies (spark.jackson.version) to at least version 2.9.2, due to Beam 
> > updating its dependencies."
> >  But it looks like the jackson-core version in the job-server is 2.13.0 
> > whereas the jars in spark-2.4.8-bin-hadoop2.7/jars are
> > -. 1 mluser mluser 46986 May 8 2021 jackson-annotations-2.6.7.jar
> > -. 1 mluser mluser 258919 May 8 2021 jackson-core-2.6.7.jar
> > -. 1 mluser mluser 232248 May 8 2021 jackson-core-asl-1.9.13.jar
> > -. 1 mluser mluser 1166637 May 8 2021 jackson-databind-2.6.7.3.jar
> > -. 1 mluser mluser 320444 May 8 2021 jackson-dataformat-yaml-2.6.7.jar
> > -. 1 mluser mluser 18336 May 8 2021 jackson-jaxrs-1.9.13.jar
> > -. 1 mluser mluser 780664 May 8 2021 jackson-mapper-asl-1.9.13.jar
> > -. 1 mluser mluser 32612 May 8 2021 
> > jackson-module-jaxb-annotations-2.6.7.jar
> > -. 1 mluser mluser 42858 May 8 2021 jackson-module-paranamer-2.7.9.jar
> > -. 1 mluser mluser 515645 May 8 2021 jackson-module-scala_2.11-2.6.7.1.jar
> >
> > There must be something to update, but I am not sure how to update these 
> > jar files with their dependencies, and not sure if this would get us very 
> > far.
> >
> > Would you have a list of binaries that work together or some running CI 
> > from the apache foundation similar to what we are trying to achieve?
>
>


Re: [Question] Spark: standard setup to use beam-spark to parallelize python code

2022-03-28 Thread Alexey Romanenko
Well, it’s caused by recent jackson's version update in Beam [1] - so, the 
jackson runtime dependencies should be updated manually (at least to 2.9.2) in 
case of using Spark 2.x. 

Either, use Spark 3..x if possible since it already provides jackson jars of 
version 2.10.0.
 
[1] 
https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c 
<https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c>

—
Alexey

> On 28 Mar 2022, at 14:15, Florian Pinault  wrote:
> 
> Greetings,
>  
> We are setting up an Apache Beam cluster using Spark as a backend to run 
> python code. This is currently a toy example with 4 virtual machines running 
> Centos (a client, a spark main, and two spark-workers). 
> We are running into version issues (detail below) and would need help on 
> which versions to set up.
> We currently are trying spark-2.4.8-bin-hadoop2.7, with the pip package beam 
> 2.37.0 on the client, and using a job-server to create docker image.
>  
> I saw here https://beam.apache.org/blog/beam-2.33.0/ 
> <https://beam.apache.org/blog/beam-2.33.0/> that "Spark 2.x users will need 
> to update Spark's Jackson runtime dependencies (spark.jackson.version) to at 
> least version 2.9.2, due to Beam updating its dependencies." 
>  But it looks like the jackson-core version in the job-server is 2.13.0 
> whereas the jars in spark-2.4.8-bin-hadoop2.7/jars are
> -. 1 mluser mluser 46986 May 8 2021 jackson-annotations-2.6.7.jar
> -. 1 mluser mluser 258919 May 8 2021 jackson-core-2.6.7.jar
> -. 1 mluser mluser 232248 May 8 2021 jackson-core-asl-1.9.13.jar
> -. 1 mluser mluser 1166637 May 8 2021 jackson-databind-2.6.7.3.jar
> -. 1 mluser mluser 320444 May 8 2021 jackson-dataformat-yaml-2.6.7.jar
> -. 1 mluser mluser 18336 May 8 2021 jackson-jaxrs-1.9.13.jar
> -. 1 mluser mluser 780664 May 8 2021 jackson-mapper-asl-1.9.13.jar
> -. 1 mluser mluser 32612 May 8 2021 jackson-module-jaxb-annotations-2.6.7.jar
> -. 1 mluser mluser 42858 May 8 2021 jackson-module-paranamer-2.7.9.jar
> -. 1 mluser mluser 515645 May 8 2021 jackson-module-scala_2.11-2.6.7.1.jar
>  
> There must be something to update, but I am not sure how to update these jar 
> files with their dependencies, and not sure if this would get us very far.
>  
> Would you have a list of binaries that work together or some running CI from 
> the apache foundation similar to what we are trying to achieve?



[Question] Spark: standard setup to use beam-spark to parallelize python code

2022-03-28 Thread Florian Pinault
Greetings,

We are setting up an Apache Beam cluster using Spark as a backend to run python 
code. This is currently a toy example with 4 virtual machines running Centos (a 
client, a spark main, and two spark-workers).
We are running into version issues (detail below) and would need help on which 
versions to set up.
We currently are trying spark-2.4.8-bin-hadoop2.7, with the pip package beam 
2.37.0 on the client, and using a job-server to create docker image.


I saw here https://beam.apache.org/blog/beam-2.33.0/ that "Spark 2.x users will 
need to update Spark's Jackson runtime dependencies (spark.jackson.version) to 
at least version 2.9.2, due to Beam updating its dependencies."

 But it looks like the jackson-core version in the job-server is 2.13.0 whereas 
the jars in spark-2.4.8-bin-hadoop2.7/jars are

-. 1 mluser mluser 46986 May 8 2021 jackson-annotations-2.6.7.jar
-. 1 mluser mluser 258919 May 8 2021 jackson-core-2.6.7.jar
-. 1 mluser mluser 232248 May 8 2021 jackson-core-asl-1.9.13.jar
-. 1 mluser mluser 1166637 May 8 2021 jackson-databind-2.6.7.3.jar
-. 1 mluser mluser 320444 May 8 2021 jackson-dataformat-yaml-2.6.7.jar
-. 1 mluser mluser 18336 May 8 2021 jackson-jaxrs-1.9.13.jar
-. 1 mluser mluser 780664 May 8 2021 jackson-mapper-asl-1.9.13.jar
-. 1 mluser mluser 32612 May 8 2021 jackson-module-jaxb-annotations-2.6.7.jar
-. 1 mluser mluser 42858 May 8 2021 jackson-module-paranamer-2.7.9.jar
-. 1 mluser mluser 515645 May 8 2021 jackson-module-scala_2.11-2.6.7.1.jar

There must be something to update, but I am not sure how to update these jar 
files with their dependencies, and not sure if this would get us very far.

Would you have a list of binaries that work together or some running CI from 
the apache foundation similar to what we are trying to achieve?



Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-02 Thread Alexey Romanenko
Thank you for quick answers, Utkarsh, but unfortunately, I don’t see the real 
cause of this right now. Seems like, it will require some remote debugging on 
your site to see what workers are actually doing.


> On 1 Feb 2022, at 22:59, Utkarsh Parekh  wrote:
> 
> If you tested earlier with the same stack, which version did you use?
> 
> Can you enable debug logs to check what’s happening there? So far the 
> following warning was received from from log4j which I received from log4j on 
> Databricks (no errors other than that).
> 
> Can you make sure that there is no issue with firewall or something? No I 
> don't think so. Because it's working fine locally and databricks notebook.
> 
> Can you run this pipeline locally against a real Kafka server, not Azure 
> Event Hub, to make sure that it works fine? Yes it's working fine with both 
> Azure EventHub and Kafka
> 
> 
> org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()'
> at 
> org.springframework.expression.spel.support.StandardTypeConverter.(StandardTypeConverter.java:46)
> at 
> org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197)
> at 
> org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115)
> at 
> org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201)
> at 
> org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130)
> at 
> org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
> at 
> org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
> at 
> org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
> at 
> org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
> at 
> org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
> at 
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124)
> at 
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85)
> at 
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207)
> at 
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
> at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172)
> at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111)
> at 
> org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181)
> at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393)
> at 
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486)
> at org.apache.spark.storage.BlockManager.org 
> <http://org.apache.spark.storage.blockmanager.org/>$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413)
> at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at 
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
> at 
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at 
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
> at 
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
&g

Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Utkarsh Parekh
And I also get this error occasionally when I execute a streaming pipeline
with a new cluster instead of an existing cluster.

https://issues.apache.org/jira/browse/BEAM-12032?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel

On Tue, Feb 1, 2022 at 1:59 PM Utkarsh Parekh 
wrote:

> If you tested earlier with the same stack, which version did you use?
>
> *Can you enable debug logs to check what’s happening there? *So far the
> following warning was received from from log4j which I received from log4j
> on Databricks (no errors other than that).
>
> *Can you make sure that there is no issue with firewall or something? *No
> I don't think so. Because it's working fine locally and databricks notebook.
>
> *Can you run this pipeline locally against a real Kafka server, not Azure
> Event Hub, to make sure that it works fine? *Yes it's working fine with
> both Azure EventHub and Kafka
>
>
>
> org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()'
> at
> org.springframework.expression.spel.support.StandardTypeConverter.(StandardTypeConverter.java:46)
> at
> org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197)
> at
> org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115)
> at
> org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201)
> at
> org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130)
> at
> org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
> at
> org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
> at
> org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
> at
> org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
> at
> org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
> at
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85)
> at
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207)
> at
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
> at
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172)
> at
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111)
> at
> org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393)
> at
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
> at
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
> at
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.scheduler.ResultT

Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Utkarsh Parekh
If you tested earlier with the same stack, which version did you use?

*Can you enable debug logs to check what’s happening there? *So far the
following warning was received from from log4j which I received from log4j
on Databricks (no errors other than that).

*Can you make sure that there is no issue with firewall or something? *No I
don't think so. Because it's working fine locally and databricks notebook.

*Can you run this pipeline locally against a real Kafka server, not Azure
Event Hub, to make sure that it works fine? *Yes it's working fine with
both Azure EventHub and Kafka


org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()'
at
org.springframework.expression.spel.support.StandardTypeConverter.(StandardTypeConverter.java:46)
at
org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197)
at
org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115)
at
org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201)
at
org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130)
at
org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
at
org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
at
org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
at
org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
at
org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
at
org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85)
at
org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207)
at
org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
at
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172)
at
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111)
at
org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486)
at org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:93)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:824)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1621)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:827)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run

Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Alexey Romanenko
Well, personally I didn’t test with this version, but it should be fine… 
Can you enable debug logs to check what’s happening there? 
Can you make sure that there is no issue with firewall or something? 
Can you run this pipeline locally against a real Kafka server, not Azure Event 
Hub, to make sure that it works fine?
Otherwise, it would need to debug remotely the worker process.

> On 1 Feb 2022, at 19:18, Utkarsh Parekh  wrote:
> 
> Sorry I sent the last message in a hurry. Here is the Beam java to kafka: Is 
> something missing here?
> 
> 
> org.apache.beam
> beam-sdks-java-io-kafka
> 2.35.0
> 
> 
> On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh  <mailto:utkarsh.s.par...@gmail.com>> wrote:
> Here it is 
> 
> 
> org.apache.kafka
> kafka-clients
> 2.8.0
> 
> 
> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Hmm, this is strange. Which version of Kafka client do you use while running 
> it with Beam?
> 
>> On 1 Feb 2022, at 16:56, Utkarsh Parekh > <mailto:utkarsh.s.par...@gmail.com>> wrote:
>> 
>> Hi Alexey, 
>> 
>> First of all, thank you for the response! Yes I did have it in Consumer 
>> configuration and try to increase "session.timeout".
>> 
>> From consumer side so far I've following settings:
>> props.put("sasl.mechanism", SASL_MECHANISM);
>> props.put("security.protocol", SECURITY_PROTOCOL);
>> props.put("sasl.jaas.config", saslJaasConfig);
>> props.put("request.timeout.ms <http://request.timeout.ms/>", 6);
>> props.put("session.timeout.ms <http://session.timeout.ms/>", 6);
>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>> 
>> It works fine using following code in Databricks Notebook. The problem has 
>> been occurring when I run it through Apache beam and KafkaIO (Just providing 
>> more context if that may help you to understand problem)
>> 
>> val df = spark.readStream
>> .format("kafka")
>> .option("subscribe", TOPIC)
>> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>> .option("kafka.sasl.mechanism", "PLAIN")
>> .option("kafka.security.protocol", "SASL_SSL")
>> .option("kafka.sasl.jaas.config", EH_SASL)
>> .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", 
>> "6")
>> .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", 
>> "6")
>> .option("failOnDataLoss", "false")
>> //.option("kafka.group.id <http://kafka.group.id/>", "testsink")
>> .option("startingOffsets", "latest")
>> .load()
>> 
>> Utkarsh
>> 
>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko > <mailto:aromanenko@gmail.com>> wrote:
>> Hi Utkarsh,
>> 
>> Can it be related to this configuration problem?
>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>  
>> <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>
>> 
>> Did you check timeout settings?
>> 
>> —
>> Alexey   
>> 
>> 
>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh >> <mailto:utkarsh.s.par...@gmail.com>> wrote:
>>> 
>>> Hello,
>>> 
>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying 
>>> to create a simple streaming app with Apache Beam, where it reads data from 
>>> an Azure event hub and produces messages into another Azure event hub. 
>>> 
>>> I'm creating and running spark jobs on Azure Databricks.
>>> 
>>> The problem is the consumer (uses SparkRunner) is not able to read data 
>>> from Event hub (queue). There is no activity and no errors on the Spark 
>>> cluster.
>>> 
>>> I would appreciate it if anyone could help to fix this issue.
>>> 
>>> Thank you
>>> 
>>> Utkarsh
>> 
> 



Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Utkarsh Parekh
Sorry I sent the last message in a hurry. Here is the Beam java to kafka:
Is something missing here?


org.apache.beam
beam-sdks-java-io-kafka
2.35.0



On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh 
wrote:

> Here it is
>
> 
> org.apache.kafka
> kafka-clients
> 2.8.0
> 
>
>
> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko 
> wrote:
>
>> Hmm, this is strange. Which version of Kafka client do you use while
>> running it with Beam?
>>
>> On 1 Feb 2022, at 16:56, Utkarsh Parekh 
>> wrote:
>>
>> Hi Alexey,
>>
>> First of all, thank you for the response! Yes I did have it in Consumer
>> configuration and try to increase "session.timeout".
>>
>> From consumer side so far I've following settings:
>>
>> props.put("sasl.mechanism", SASL_MECHANISM);
>> props.put("security.protocol", SECURITY_PROTOCOL);
>> props.put("sasl.jaas.config", saslJaasConfig);
>> props.put("request.timeout.ms", 6);
>> props.put("session.timeout.ms", 6);
>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>>
>>
>> It works fine using following code in Databricks Notebook. The problem
>> has been occurring when I run it through Apache beam and KafkaIO (Just
>> providing more context if that may help you to understand problem)
>>
>> val df = spark.readStream
>> .format("kafka")
>> .option("subscribe", TOPIC)
>> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>> .option("kafka.sasl.mechanism", "PLAIN")
>> .option("kafka.security.protocol", "SASL_SSL")
>> .option("kafka.sasl.jaas.config", EH_SASL)
>> .option("kafka.request.timeout.ms", "6")
>> .option("kafka.session.timeout.ms", "6")
>> .option("failOnDataLoss", "false")
>> //.option("kafka.group.id", "testsink")
>> .option("startingOffsets", "latest")
>> .load()
>>
>> Utkarsh
>>
>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko 
>> wrote:
>>
>>> Hi Utkarsh,
>>>
>>> Can it be related to this configuration problem?
>>>
>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>>
>>> Did you check timeout settings?
>>>
>>> —
>>> Alexey
>>>
>>>
>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh 
>>> wrote:
>>>
>>> Hello,
>>>
>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
>>> trying to create a simple streaming app with Apache Beam, where it reads
>>> data from an Azure event hub and produces messages into another Azure event
>>> hub.
>>>
>>> I'm creating and running spark jobs on Azure Databricks.
>>>
>>> The problem is the consumer (uses SparkRunner) is not able to read data
>>> from Event hub (queue). There is no activity and no errors on the Spark
>>> cluster.
>>>
>>> I would appreciate it if anyone could help to fix this issue.
>>>
>>> Thank you
>>>
>>> Utkarsh
>>>
>>>
>>>
>>


Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Utkarsh Parekh
Here it is


org.apache.kafka
kafka-clients
2.8.0



On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko 
wrote:

> Hmm, this is strange. Which version of Kafka client do you use while
> running it with Beam?
>
> On 1 Feb 2022, at 16:56, Utkarsh Parekh 
> wrote:
>
> Hi Alexey,
>
> First of all, thank you for the response! Yes I did have it in Consumer
> configuration and try to increase "session.timeout".
>
> From consumer side so far I've following settings:
>
> props.put("sasl.mechanism", SASL_MECHANISM);
> props.put("security.protocol", SECURITY_PROTOCOL);
> props.put("sasl.jaas.config", saslJaasConfig);
> props.put("request.timeout.ms", 6);
> props.put("session.timeout.ms", 6);
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>
>
> It works fine using following code in Databricks Notebook. The problem has
> been occurring when I run it through Apache beam and KafkaIO (Just
> providing more context if that may help you to understand problem)
>
> val df = spark.readStream
> .format("kafka")
> .option("subscribe", TOPIC)
> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
> .option("kafka.sasl.mechanism", "PLAIN")
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.jaas.config", EH_SASL)
> .option("kafka.request.timeout.ms", "6")
> .option("kafka.session.timeout.ms", "6")
> .option("failOnDataLoss", "false")
> //.option("kafka.group.id", "testsink")
> .option("startingOffsets", "latest")
> .load()
>
> Utkarsh
>
> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko 
> wrote:
>
>> Hi Utkarsh,
>>
>> Can it be related to this configuration problem?
>>
>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>
>> Did you check timeout settings?
>>
>> —
>> Alexey
>>
>>
>> On 1 Feb 2022, at 02:27, Utkarsh Parekh 
>> wrote:
>>
>> Hello,
>>
>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
>> trying to create a simple streaming app with Apache Beam, where it reads
>> data from an Azure event hub and produces messages into another Azure event
>> hub.
>>
>> I'm creating and running spark jobs on Azure Databricks.
>>
>> The problem is the consumer (uses SparkRunner) is not able to read data
>> from Event hub (queue). There is no activity and no errors on the Spark
>> cluster.
>>
>> I would appreciate it if anyone could help to fix this issue.
>>
>> Thank you
>>
>> Utkarsh
>>
>>
>>
>


Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Alexey Romanenko
Hmm, this is strange. Which version of Kafka client do you use while running it 
with Beam?

> On 1 Feb 2022, at 16:56, Utkarsh Parekh  wrote:
> 
> Hi Alexey, 
> 
> First of all, thank you for the response! Yes I did have it in Consumer 
> configuration and try to increase "session.timeout".
> 
> From consumer side so far I've following settings:
> props.put("sasl.mechanism", SASL_MECHANISM);
> props.put("security.protocol", SECURITY_PROTOCOL);
> props.put("sasl.jaas.config", saslJaasConfig);
> props.put("request.timeout.ms <http://request.timeout.ms/>", 6);
> props.put("session.timeout.ms <http://session.timeout.ms/>", 6);
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
> 
> It works fine using following code in Databricks Notebook. The problem has 
> been occurring when I run it through Apache beam and KafkaIO (Just providing 
> more context if that may help you to understand problem)
> 
> val df = spark.readStream
> .format("kafka")
> .option("subscribe", TOPIC)
> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
> .option("kafka.sasl.mechanism", "PLAIN")
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.jaas.config", EH_SASL)
> .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", 
> "6")
> .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", 
> "6")
> .option("failOnDataLoss", "false")
> //.option("kafka.group.id <http://kafka.group.id/>", "testsink")
> .option("startingOffsets", "latest")
> .load()
> 
> Utkarsh
> 
> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko  <mailto:aromanenko@gmail.com>> wrote:
> Hi Utkarsh,
> 
> Can it be related to this configuration problem?
> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>  
> <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>
> 
> Did you check timeout settings?
> 
> —
> Alexey
> 
> 
>> On 1 Feb 2022, at 02:27, Utkarsh Parekh > <mailto:utkarsh.s.par...@gmail.com>> wrote:
>> 
>> Hello,
>> 
>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying 
>> to create a simple streaming app with Apache Beam, where it reads data from 
>> an Azure event hub and produces messages into another Azure event hub. 
>> 
>> I'm creating and running spark jobs on Azure Databricks.
>> 
>> The problem is the consumer (uses SparkRunner) is not able to read data from 
>> Event hub (queue). There is no activity and no errors on the Spark cluster.
>> 
>> I would appreciate it if anyone could help to fix this issue.
>> 
>> Thank you
>> 
>> Utkarsh
> 



Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Utkarsh Parekh
Hi Alexey,

First of all, thank you for the response! Yes I did have it in Consumer
configuration and try to increase "session.timeout".

>From consumer side so far I've following settings:

props.put("sasl.mechanism", SASL_MECHANISM);
props.put("security.protocol", SECURITY_PROTOCOL);
props.put("sasl.jaas.config", saslJaasConfig);
props.put("request.timeout.ms", 6);
props.put("session.timeout.ms", 6);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);


It works fine using following code in Databricks Notebook. The problem has
been occurring when I run it through Apache beam and KafkaIO (Just
providing more context if that may help you to understand problem)

val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "6")
.option("kafka.session.timeout.ms", "6")
.option("failOnDataLoss", "false")
//.option("kafka.group.id", "testsink")
.option("startingOffsets", "latest")
.load()

Utkarsh

On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko 
wrote:

> Hi Utkarsh,
>
> Can it be related to this configuration problem?
>
> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>
> Did you check timeout settings?
>
> —
> Alexey
>
>
> On 1 Feb 2022, at 02:27, Utkarsh Parekh 
> wrote:
>
> Hello,
>
> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
> trying to create a simple streaming app with Apache Beam, where it reads
> data from an Azure event hub and produces messages into another Azure event
> hub.
>
> I'm creating and running spark jobs on Azure Databricks.
>
> The problem is the consumer (uses SparkRunner) is not able to read data
> from Event hub (queue). There is no activity and no errors on the Spark
> cluster.
>
> I would appreciate it if anyone could help to fix this issue.
>
> Thank you
>
> Utkarsh
>
>
>


Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Alexey Romanenko
Hi Utkarsh,

Can it be related to this configuration problem?
https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
 
<https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>

Did you check timeout settings?

—
Alexey  


> On 1 Feb 2022, at 02:27, Utkarsh Parekh  wrote:
> 
> Hello,
> 
> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying 
> to create a simple streaming app with Apache Beam, where it reads data from 
> an Azure event hub and produces messages into another Azure event hub. 
> 
> I'm creating and running spark jobs on Azure Databricks.
> 
> The problem is the consumer (uses SparkRunner) is not able to read data from 
> Event hub (queue). There is no activity and no errors on the Spark cluster.
> 
> I would appreciate it if anyone could help to fix this issue.
> 
> Thank you
> 
> Utkarsh



Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-01-31 Thread Utkarsh Parekh
Hello,

I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying
to create a simple streaming app with Apache Beam, where it reads data from
an Azure event hub and produces messages into another Azure event hub.

I'm creating and running spark jobs on Azure Databricks.

The problem is the consumer (uses SparkRunner) is not able to read data
from Event hub (queue). There is no activity and no errors on the Spark
cluster.

I would appreciate it if anyone could help to fix this issue.

Thank you

Utkarsh


Re: Compatibility of Spark portable runner

2022-01-05 Thread Alexey Romanenko
Generally speaking, to avoid the potential issues the versions that are used in 
compile time and in runtime should be the same (most important is Scala 
versions) but, due to Spark backward compatibility, the minor versions can be 
different.  

> On 5 Jan 2022, at 07:50, Zheng Ni  wrote:
> 
> Hi Beam Community,
>  
> Greetings.
>  
> I am interested in submitting a spark job through portable runner. I have a 
> question about the compatibility between spark_job_server and spark cluster.
>  
> Let’s say I am going to use beam_spark_job_server of version 2.35.0. How 
> could I know which spark cluster version is compatible with it? Or could it 
> work with any versions of the spark cluster?
>  
>   <>
> Regards,
> Zheng 
> 



Compatibility of Spark portable runner

2022-01-04 Thread Zheng Ni
Hi Beam Community,



Greetings.



I am interested in submitting a spark job through portable runner. I have a
question about the compatibility between spark_job_server and spark cluster.



Let’s say I am going to use beam_spark_job_server of version 2.35.0. How
could I know which spark cluster version is compatible with it? Or could it
work with any versions of the spark cluster?





Regards,

Zheng


Re: Perf issue with Beam on spark (spark runner)

2021-10-12 Thread Alexey Romanenko
Robert,

Do you have any numbers by chance regarding this optimisation?

Alexey

> On 5 Oct 2021, at 00:27, Robert Bradshaw  wrote:
> 
> https://github.com/apache/beam/pull/15637 
> <https://github.com/apache/beam/pull/15637> might help some.
> 
> On Thu, Sep 9, 2021 at 5:21 PM Tao Li  <mailto:t...@zillow.com>> wrote:
> Thanks Mike for this info!
> 
>  
> 
> From: Mike Kaplinskiy mailto:m...@ladderlife.com>>
> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Date: Tuesday, September 7, 2021 at 2:15 PM
> To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Cc: Alexey Romanenko  <mailto:aromanenko@gmail.com>>, Andrew Pilloud  <mailto:apill...@google.com>>, Ismaël Mejía  <mailto:ieme...@gmail.com>>, Kyle Weaver  <mailto:kcwea...@google.com>>, Yuchu Cao  <mailto:yuc...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> A long time ago when I was experimenting with the Spark runner for a batch 
> job, I noticed that a lot of time was spend in GC as well. In my case I 
> narrowed it down to how the Spark runner implements Coders. 
> 
>  
> 
> Spark's value prop is that it only serializes data when it truly has no other 
> choice - i.e. when it needs to reclaim memory or when it sends things over 
> the wire. Unfortunately due to the mismatch in serialization APIs between 
> Beam and Spark, Beam's Spark runner actually just serializes things all the 
> time. My theory was that the to/from byte array dance was slow. I attempted 
> to fix this at https://github.com/apache/beam/pull/8371 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fpull%2F8371=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187448677%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=ZtgDb0R3gjSHVU1rpp6T0ZVl7ZhXXRhH%2BqFMX8Z1z%2Bo%3D=0>
>  but I could never actually reproduce a speedup in performance benchmarks.
> 
>  
> 
> If you're feeling up to it, you could try reviving something like that PR and 
> see if it helps.
> 
>  
> 
> Mike.
> 
> Ladder 
> <https://nam11.safelinks.protection.outlook.com/?url=http%3A%2F%2Fbit.ly%2F1VRtWfS=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=RBjmeAAqHdrZmXZEP7ONXwXZyLOwwx6tQbST%2Bs6wq2Q%3D=0>.
>  The smart, modern way to insure your life.
> 
>  
> 
>  
> 
> On Sat, Aug 14, 2021 at 4:35 PM Tao Li  <mailto:t...@zillow.com>> wrote:
> 
> @Alexey Romanenko <mailto:aromanenko@gmail.com> I tried out ParquetIO 
> splittable and the processing time improved from 10 min to 6 min, but still 
> much longer than 2 min using a native spark app.
> 
>  
> 
> We are still seeing a lot of GC cost from below call stack. Do you think this 
> ticket can fix this issue https://issues.apache.org/jira/browse/BEAM-12646 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=Y4OpoFWLzBOf9Lfzg%2BBc%2ByTSsnIh%2FQVU4FSfrU93L%2F0%3D=0>
>  ? Thanks.
> 
>  
> 
> 
> 
>  
> 
>  
> 
>  
> 
> From: Tao Li mailto:t...@zillow.com>>
> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Date: Friday, August 6, 2021 at 11:12 AM
> To: Alexey Romanenko  <mailto:aromanenko@gmail.com>>
> Cc: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>, Andrew Pilloud 
> mailto:apill...@google.com>>, Ismaël Mejía 
> mailto:ieme...@gmail.com>>, Kyle Weaver 
> mailto:kcwea...@google.com>>, Yuchu Cao 
> mailto:yuc...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
> 
>  
> 
> Thanks @Alexey Romanenko <mailto:aromanenko@gmail.com> please see my 
> clarifications below.
> 
>  
> 
>  
> 
> | “Well, of course, if you read all fields (columns) then you don’t need 
> column projection. Otherwise, it can give a quite significant performance 
> boost, especially for large tables wi

Re: Trying to run Beam on Spark cluster

2021-09-29 Thread Mark Striebeck
Thanks Kyle,

On Fri, Sep 24, 2021 at 1:48 PM Kyle Weaver  wrote:

> Hi Mark. Looks like a problem with artifact staging. PortableRunner
> implicitly requires a directory (configurable with --artifacts_dir, under
> /tmp by default) that is accessible by both the job server and Beam worker.
>

Hmmm, I guess I could create an NFS share between the machines and use that
for the artifacts_dir. But if I use enironment_type=DOCKER, the docker
image won't have access to that. Is there some easy way to modify the
docker command that the worker runs when it stars the docker image to map
this directory (via '-v') into the docker image?


> You should be able to get around this by using --runner SparkRunner
> instead:
>
> python -m apache_beam.examples.wordcount
>  gs://datapipeline-output/shakespeare-alls-11.txt --output
> gs://datapipeline-output/output/   --project august-ascent-325423
> --environment_type=DOCKER *--runner SparkRunner
> --spark_rest_url http://hostname:6066 <http://hostname:6066>*
>
> This requires you to enable REST on your Spark master by putting
> `spark.master.rest.enabled` in your config, and then setting the Beam
> pipeline option --spark_rest_url to use its address (6066 is the default
> port).
>

I'll try that next while waiting for the answer above.

Thanks
 Mark

>
> This starts the job server for you, so you don't need to do that ahead of
> time.
>
> Best,
> Kyle
>
> On Sun, Sep 19, 2021 at 12:22 PM Mark Striebeck 
> wrote:
>
>> Hi,
>>
>> I am trying to run beam on a small spark cluster. I setup spark (master
>> plus one slave). I am using the portable runner and invoke the beam
>> pipeline with:
>>
>> python -m apache_beam.examples.wordcount
>>  gs://datapipeline-output/shakespeare-alls-11.txt --output
>> gs://datapipeline-output/output/   --project august-ascent-325423 --runner
>> PortableRunner --job_endpoint=localhost:8099 --environment_type=DOCKER
>>
>> I always get an error:
>> Caused by: java.util.concurrent.TimeoutException: Timed out while waiting
>> for command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
>> apache/beam_python3.8_sdk:2.32.0 --id=4-1
>> --provision_endpoint=localhost:46757'
>>
>> It takes ~2.5 minutes to pull the beam image which should be enough. But
>> I pulled the image manually (docker pull apache/beam_python3.8_sdk:2.32.0)
>> and then tried to run the pipeline again.
>>
>> Now, when I run the pipeline I get an error:
>> java.io.FileNotFoundException:
>> /tmp/beam-artifact-staging/60321f712323c195764ab31b3e205b228a405fbb80b50fafa67b38b21959c63f/1-ref_Environment_default_e-pickled_main_session
>> (No such file or directory)
>>
>> and then further down
>>
>> ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>> java.lang.IllegalStateException: No container running for id
>> 7014a9ea98dc0b3f453a9d3860aff43ba42214195d2240d7cefcefcfabf93879
>>
>> (here is the full strack trace:
>> https://drive.google.com/file/d/1mRzt8G7I9Akkya48KfAbrqPp8wRCzXDe/view)
>>
>> Any pointer or idea is appreciated (sorry, if this is something obvious -
>> I'm still pretty new to beam/spark).
>>
>> Thanks
>>   Mark
>>
>


Trying to run Beam on Spark cluster

2021-09-19 Thread Mark Striebeck
Hi,

I am trying to run beam on a small spark cluster. I setup spark (master
plus one slave). I am using the portable runner and invoke the beam
pipeline with:

python -m apache_beam.examples.wordcount
 gs://datapipeline-output/shakespeare-alls-11.txt --output
gs://datapipeline-output/output/   --project august-ascent-325423 --runner
PortableRunner --job_endpoint=localhost:8099 --environment_type=DOCKER

I always get an error:
Caused by: java.util.concurrent.TimeoutException: Timed out while waiting
for command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
apache/beam_python3.8_sdk:2.32.0 --id=4-1
--provision_endpoint=localhost:46757'

It takes ~2.5 minutes to pull the beam image which should be enough. But I
pulled the image manually (docker pull apache/beam_python3.8_sdk:2.32.0)
and then tried to run the pipeline again.

Now, when I run the pipeline I get an error:
java.io.FileNotFoundException:
/tmp/beam-artifact-staging/60321f712323c195764ab31b3e205b228a405fbb80b50fafa67b38b21959c63f/1-ref_Environment_default_e-pickled_main_session
(No such file or directory)

and then further down

ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalStateException: No container running for id
7014a9ea98dc0b3f453a9d3860aff43ba42214195d2240d7cefcefcfabf93879

(here is the full strack trace:
https://drive.google.com/file/d/1mRzt8G7I9Akkya48KfAbrqPp8wRCzXDe/view)

Any pointer or idea is appreciated (sorry, if this is something obvious -
I'm still pretty new to beam/spark).

Thanks
  Mark


Re: java.io.InvalidClassException with Spark 3.1.2

2021-08-22 Thread Yu Watanabe
Sure. I starred your repository.

On Sat, Aug 21, 2021 at 11:27 AM cw  wrote:

> Hello Yu,
>i done lot of testing, it only work for spark 2+, not 3. if you need a
> working example on kubernetes,
> https://github.com/cometta/python-apache-beam-spark , feel free to
> improve the code, if you would like to contribute. help me *star if if it
> is useful for you. thank you
>
> On Monday, August 16, 2021, 12:37:46 AM GMT+8, Yu Watanabe <
> yu.w.ten...@gmail.com> wrote:
>
>
> Hello .
>
> I would like to ask question for spark runner.
>
> Using spark downloaded from below link,
>
>
> https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
>
> I get below error when submitting a pipeline.
> Full error is on
> https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693.
>
>
> --
> 21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection
> from /192.168.11.2:35601
> java.io.InvalidClassException:
> scala.collection.mutable.WrappedArray$ofRef; local class incompatible:
> stream classdesc serialVersionUID = 3456489343829468865, local class
> serialVersionUID = 1028182004549731694
> at
> java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689)
> ...
>
> --
>
> SDK Harness and Job service are deployed as below.
>
> 1. SDK Harness
>
> sudo docker run --net=host apache/beam_spark3_job_server:2.31.0
> --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true
>
> 2. Job service
>
> sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool
>
> * apache/beam_spark_job_server:2.31.0 for spark 2.4.8
>
> 3. SDK client code
>
> https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2
>
> Spark 2.4.8 succeeded without any errors using above components.
>
>
> https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
>
> Would there be any setting which you need to be aware of for spark 3.1.2 ?
>
> Thanks,
> Yu Watanabe
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis
>
>


-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: java.io.InvalidClassException with Spark 3.1.2

2021-08-21 Thread cw
 Hello Yu,   i done lot of testing, it only work for spark 2+, not 3. if you 
need a working example on kubernetes, 
https://github.com/cometta/python-apache-beam-spark , feel free to improve the 
code, if you would like to contribute. help me *star if if it is useful for 
you. thank you

On Monday, August 16, 2021, 12:37:46 AM GMT+8, Yu Watanabe 
 wrote:  
 
 Hello .
I would like to ask question for spark runner.
Using spark downloaded from below link,
https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

I get below error when submitting a pipeline. Full error is on 
https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693.
--21/08/16
 01:10:26 WARN TransportChannelHandler: Exception in connection from 
/192.168.11.2:35601
java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef; 
local class incompatible: stream classdesc serialVersionUID = 
3456489343829468865, local class serialVersionUID = 1028182004549731694
 at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689)
...--
SDK Harness and Job service are deployed as below.
1. SDK Harness
sudo docker run --net=host apache/beam_spark3_job_server:2.31.0 
--spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true

2. Job service
sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool

* apache/beam_spark_job_server:2.31.0 for spark 2.4.8
3. SDK client code
https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2

Spark 2.4.8 succeeded without any errors using above components.
https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz

Would there be any setting which you need to be aware of for spark 3.1.2 ?
Thanks,Yu Watanabe
-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis
      

Re: java.io.InvalidClassException with Spark 3.1.2

2021-08-19 Thread Yu Watanabe
Kyle.

Thank you.

On Tue, Aug 17, 2021 at 5:55 AM Kyle Weaver  wrote:

> I was able to reproduce the error. I'm not sure why this would happen,
> since as far as I can tell the Beam 2.31.0 Spark runner should be using
> Spark 3.1.2 and Scala 2.12 [1]. I filed a JIRA issue for it. [2]
>
> [1]
> https://github.com/apache/beam/pull/14897/commits/b6fca2bb79d9e7a69044b477460445456720ec58
> [2] https://issues.apache.org/jira/browse/BEAM-12762
>
>
> On Sun, Aug 15, 2021 at 9:37 AM Yu Watanabe  wrote:
>
>> Hello .
>>
>> I would like to ask question for spark runner.
>>
>> Using spark downloaded from below link,
>>
>>
>> https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
>>
>> I get below error when submitting a pipeline.
>> Full error is on
>> https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693.
>>
>>
>> --
>> 21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection
>> from /192.168.11.2:35601
>> java.io.InvalidClassException:
>> scala.collection.mutable.WrappedArray$ofRef; local class incompatible:
>> stream classdesc serialVersionUID = 3456489343829468865, local class
>> serialVersionUID = 1028182004549731694
>> at
>> java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689)
>> ...
>>
>> ------
>>
>> SDK Harness and Job service are deployed as below.
>>
>> 1. SDK Harness
>>
>> sudo docker run --net=host apache/beam_spark3_job_server:2.31.0
>> --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true
>>
>> 2. Job service
>>
>> sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool
>>
>> * apache/beam_spark_job_server:2.31.0 for spark 2.4.8
>>
>> 3. SDK client code
>>
>> https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2
>>
>> Spark 2.4.8 succeeded without any errors using above components.
>>
>>
>> https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
>>
>> Would there be any setting which you need to be aware of for spark 3.1.2 ?
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>>
>> linkedin: www.linkedin.com/in/yuwatanabe1/
>> twitter:   twitter.com/yuwtennis
>>
>>
>

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


java.io.InvalidClassException with Spark 3.1.2

2021-08-15 Thread Yu Watanabe
Hello .

I would like to ask question for spark runner.

Using spark downloaded from below link,

https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

I get below error when submitting a pipeline.
Full error is on
https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693.

--
21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection
from /192.168.11.2:35601
java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef;
local class incompatible: stream classdesc serialVersionUID =
3456489343829468865, local class serialVersionUID = 1028182004549731694
at
java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689)
...
--

SDK Harness and Job service are deployed as below.

1. SDK Harness

sudo docker run --net=host apache/beam_spark3_job_server:2.31.0
--spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true

2. Job service

sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool

* apache/beam_spark_job_server:2.31.0 for spark 2.4.8

3. SDK client code

https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2

Spark 2.4.8 succeeded without any errors using above components.

https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz

Would there be any setting which you need to be aware of for spark 3.1.2 ?

Thanks,
Yu Watanabe

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


example on python + spark + k8s

2021-08-15 Thread cw
Hello
 I created an example of running python code on Apache Beam + Spark + 
Kubernetes at  https://github.com/cometta/python-apache-beam-spark because I 
unable to find any step-by-step guide on how to do this for the past few weeks. 
I hope it will be useful for other that also looking for similar 
implementation. Have a good day. Contributors are welcome to further improve 
the code.

Thank you





Re: Submit Python Beam on Spark Dataproc

2021-08-15 Thread Yu Watanabe
Hello Mahan.

Sorry for the late reply.

> Still waiting for startup of environment from localhost:5 for worker
id 1-1

>From the message , it seems that something is wrong with connection between
Worker node in spark cluster and SDK harness.

According to this slide runner worker (in your context spark worker) ,
should also have connectivity with sdk harness container.

https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_1_0

Could you please also try setting ssh tunneling to spark worker node as
well ?

Thanks,
Yu

On Thu, Aug 12, 2021 at 9:07 PM Mahan Hosseinzadeh 
wrote:

> Thanks Yu for the help and the tips.
>
> I ran the following steps but my job is stuck and can't get submitted to
> Dataproc and I keep getting this message in job-server:
> Still waiting for startup of environment from localhost:5 for worker
> id 1-1
>
>
> -
> *Beam code:*
> pipeline_options = PipelineOptions([
> "--runner=PortableRunner",
> "--job_endpoint=localhost:8099",
> "--environment_type=EXTERNAL",
> "--environment_config=localhost:5"
> ])
>
> -
> *Job Server:*
> I couldn't use Docker because host networking doesn't work on Mac OS and I
> used Gradle instead
>
> ./gradlew :runners:spark:3:job-server:runShadow
>
> -
> *Beam Worker Pool:*
> docker run -p=5:5 apache/beam_python3.7_sdk --worker_pool
>
> -
> *SSH tunnel to the master node:*
> gcloud compute ssh  \
> --project  \
> --zone   \
> -- -NL 7077:localhost:7077
>
> -
>
> Thanks,
> Mahan
>
> On Tue, Aug 10, 2021 at 3:53 PM Yu Watanabe  wrote:
>
>> Hello .
>>
>> Would this page help ? I hope it helps.
>>
>> https://beam.apache.org/documentation/runners/spark/
>>
>> > Running on a pre-deployed Spark cluster
>>
>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
>> 7077 the master url port?
>> * Yes.
>>
>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
>> * Job server should be able to communicate with Spark master node port
>> 7077. So I believe it is Yes.
>>
>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
>> Harness Configuration?
>> * This is the configuration of how you want  your harness container to
>> spin up.
>>
>> https://beam.apache.org/documentation/runtime/sdk-harness-config/
>>
>> For DOCKER , you will need docker deployed on all spark worker nodes.
>> > User code is executed within a container started on each worker node
>>
>> I used EXTERNAL when I did it with flink cluster before.
>>
>> e.g
>>
>> https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14
>>
>> 4- Should we run the job-server outside of the Dataproc cluster or should
>> we run it in the master node?
>> * Depends. It could be inside or outside the master node. But if you are
>> connecting to full managed service, then outside might be better.
>>
>> https://beam.apache.org/documentation/runners/spark/
>>
>> > Start JobService that will connect with the Spark master
>>
>> Thanks,
>> Yu
>>
>> On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a Python Beam job that works on Dataflow but we would like to
>>> submit it on a Spark Dataproc cluster with no Flink involvement.
>>> I already spent days but failed to figure out how to use PortableRunner
>>> with the beam_spark_job_server to submit my Python Beam job to Spark
>>> Dataproc. All the Beam docs are about Flink and there is no guideline about
>>> Spark with Dataproc.
>>> Some relevant questions might be:
>>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
>>> 7077 the master url port?
>>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
>>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
>>> Harness Configuration?
>>> 4- Should we run the job-server outside of the Dataproc cluster or
>>> should we run it in the master node?
>>>
>>> Thanks,
>>> Mahan
>>>
>>
>>
>> --
>> Yu Watanabe
>>
>> linkedin: www.linkedin.com/in/yuwatanabe1/
>> twitter:   twitter.com/yuwtennis
>>
>>
>

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: Submit Python Beam on Spark Dataproc

2021-08-12 Thread Mahan Hosseinzadeh
Thanks Yu for the help and the tips.

I ran the following steps but my job is stuck and can't get submitted to
Dataproc and I keep getting this message in job-server:
Still waiting for startup of environment from localhost:5 for worker id
1-1

-
*Beam code:*
pipeline_options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=EXTERNAL",
"--environment_config=localhost:5"
])
-
*Job Server:*
I couldn't use Docker because host networking doesn't work on Mac OS and I
used Gradle instead

./gradlew :runners:spark:3:job-server:runShadow
-
*Beam Worker Pool:*
docker run -p=5:5 apache/beam_python3.7_sdk --worker_pool
-
*SSH tunnel to the master node:*
gcloud compute ssh  \
--project  \
--zone   \
-- -NL 7077:localhost:7077
-

Thanks,
Mahan

On Tue, Aug 10, 2021 at 3:53 PM Yu Watanabe  wrote:

> Hello .
>
> Would this page help ? I hope it helps.
>
> https://beam.apache.org/documentation/runners/spark/
>
> > Running on a pre-deployed Spark cluster
>
> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
> 7077 the master url port?
> * Yes.
>
> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
> * Job server should be able to communicate with Spark master node port
> 7077. So I believe it is Yes.
>
> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
> Harness Configuration?
> * This is the configuration of how you want  your harness container to
> spin up.
>
> https://beam.apache.org/documentation/runtime/sdk-harness-config/
>
> For DOCKER , you will need docker deployed on all spark worker nodes.
> > User code is executed within a container started on each worker node
>
> I used EXTERNAL when I did it with flink cluster before.
>
> e.g
>
> https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14
>
> 4- Should we run the job-server outside of the Dataproc cluster or should
> we run it in the master node?
> * Depends. It could be inside or outside the master node. But if you are
> connecting to full managed service, then outside might be better.
>
> https://beam.apache.org/documentation/runners/spark/
>
> > Start JobService that will connect with the Spark master
>
> Thanks,
> Yu
>
> On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh 
> wrote:
>
>> Hi,
>>
>> I have a Python Beam job that works on Dataflow but we would like to
>> submit it on a Spark Dataproc cluster with no Flink involvement.
>> I already spent days but failed to figure out how to use PortableRunner
>> with the beam_spark_job_server to submit my Python Beam job to Spark
>> Dataproc. All the Beam docs are about Flink and there is no guideline about
>> Spark with Dataproc.
>> Some relevant questions might be:
>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
>> 7077 the master url port?
>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
>> Harness Configuration?
>> 4- Should we run the job-server outside of the Dataproc cluster or should
>> we run it in the master node?
>>
>> Thanks,
>> Mahan
>>
>
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis
>
>


Re: Submit Python Beam on Spark Dataproc

2021-08-10 Thread Yu Watanabe
Hello .

Would this page help ? I hope it helps.

https://beam.apache.org/documentation/runners/spark/

> Running on a pre-deployed Spark cluster

1- What's spark-master-url in case of a remote cluster on Dataproc? Is 7077
the master url port?
* Yes.

2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
* Job server should be able to communicate with Spark master node port
7077. So I believe it is Yes.

3- What's the environment_type? Can we use DOCKER? Then what's the SDK
Harness Configuration?
* This is the configuration of how you want  your harness container to spin
up.

https://beam.apache.org/documentation/runtime/sdk-harness-config/

For DOCKER , you will need docker deployed on all spark worker nodes.
> User code is executed within a container started on each worker node

I used EXTERNAL when I did it with flink cluster before.

e.g
https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14

4- Should we run the job-server outside of the Dataproc cluster or should
we run it in the master node?
* Depends. It could be inside or outside the master node. But if you are
connecting to full managed service, then outside might be better.

https://beam.apache.org/documentation/runners/spark/

> Start JobService that will connect with the Spark master

Thanks,
Yu

On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh 
wrote:

> Hi,
>
> I have a Python Beam job that works on Dataflow but we would like to
> submit it on a Spark Dataproc cluster with no Flink involvement.
> I already spent days but failed to figure out how to use PortableRunner
> with the beam_spark_job_server to submit my Python Beam job to Spark
> Dataproc. All the Beam docs are about Flink and there is no guideline about
> Spark with Dataproc.
> Some relevant questions might be:
> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
> 7077 the master url port?
> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
> Harness Configuration?
> 4- Should we run the job-server outside of the Dataproc cluster or should
> we run it in the master node?
>
> Thanks,
> Mahan
>


-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Submit Python Beam on Spark Dataproc

2021-08-10 Thread Mahan Hosseinzadeh
Hi,

I have a Python Beam job that works on Dataflow but we would like to submit
it on a Spark Dataproc cluster with no Flink involvement.
I already spent days but failed to figure out how to use PortableRunner
with the beam_spark_job_server to submit my Python Beam job to Spark
Dataproc. All the Beam docs are about Flink and there is no guideline about
Spark with Dataproc.
Some relevant questions might be:
1- What's spark-master-url in case of a remote cluster on Dataproc? Is 7077
the master url port?
2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
3- What's the environment_type? Can we use DOCKER? Then what's the SDK
Harness Configuration?
4- Should we run the job-server outside of the Dataproc cluster or should
we run it in the master node?

Thanks,
Mahan


Re: Perf issue with Beam on spark (spark runner)

2021-08-06 Thread Alexey Romanenko


> On 5 Aug 2021, at 18:17, Tao Li  wrote:
> 
> It was a great presentation!

Thanks!

>  Regarding my perf testing, I was not doing aggregation, filtering, 
> projection or joining. I was simply reading all the fields of parquet and 
> then immediately save PCollection back to parquet.

Well, of course, if you read all fields (columns) then you don’t need column 
projection. Otherwise, it can give a quite significant performance boost, 
especially for large tables with many columns. 


> Regarding SDF translation, is it enabled by default?

From Beam 2.30.0 release notes:

"Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI 
opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based 
Read transforms 
([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670))”

—
Alexey

>  I will check out ParquetIO splittable. Thanks!
>  
> From: Alexey Romanenko 
> Date: Thursday, August 5, 2021 at 6:40 AM
> To: Tao Li 
> Cc: "user@beam.apache.org" , Andrew Pilloud 
> , Ismaël Mejía , Kyle Weaver 
> , Yuchu Cao 
> Subject: Re: Perf issue with Beam on spark (spark runner)
>  
> It’s very likely that Spark SQL may have much better performance because of 
> SQL push-downs and avoiding additional ser/deser operations.
>  
> In the same time, did you try to leverage "withProjection()” in ParquetIO and 
> project only the fields that you needed? 
>  
> Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])?
>  
> Also, using SDF translation for Read on Spark Runner can cause performance 
> degradation as well (we noticed that in our experiments). Try to use non-SDF 
> read (if not yet) [2]
>  
>  
> PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m 
> not sure if a recording is already available but you can find the slides here 
> [3] that can be helpful.
>  
>  
> —
> Alexey
>  
> [1] https://issues.apache.org/jira/browse/BEAM-12070 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=Yq%2FODFNPo7XncHKExNDRBw6qRH2HSrymTcSGGRRWICs%3D=0>
> [2] https://issues.apache.org/jira/browse/BEAM-10670 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=ABQA4rB%2BeiMHIGdXQKiADS93F9%2F3bUfn4%2BCRRr4dgVI%3D=0>
> [3] 
> https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing
>  
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=%2Fj0Qeibje5jk0Hiz9x57Pa92mRTyzvmTf63hOrNCPZ4%3D=0>
>  
> 
> 
>> On 5 Aug 2021, at 03:07, Tao Li mailto:t...@zillow.com>> 
>> wrote:
>>  
>> @Alexey Romanenko <mailto:aromanenko@gmail.com> @Ismaël Mejía 
>> <mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can you 
>> please take a look at this thread and confirm this jira covers the causes 
>> https://issues.apache.org/jira/browse/BEAM-12646 
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=c23T9dKc0muC7sRWrsAYrewA4QKAUSc6tOAwe9kRfC4%3D=0>
>>  ?
>>  
>> This perf issue is currently a blocker to me..
>>  
>> Thanks so much!
>>  
>> From: Tao Li mailto:t...@zillow.com>>
>> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
>> mailto:user@beam.apache.org>>
>> Date: Friday, July 30, 2021 at 3:53 PM
>> To: Andrew Pilloud mailto:apill...@google.com>>, 
>> "user@beam.apache.org <mailto:user@beam.apache.org>" > <mailto:user@beam.apache.org>>
>> Cc: Kyle Weaver mailto:kcwea...@google.com>>, Yuchu 
>> Cao mailto:yuc...@trulia.com>

Re: Spark Structured Streaming runner migrated to Spark 3

2021-08-05 Thread Austin Bennett
Hooray!  Thanks, Etienne!

On Thu, Aug 5, 2021 at 3:11 AM Etienne Chauchot 
wrote:

> Hi all,
>
> Just to let you know that Spark Structured Streaming runner was migrated
> to Spark 3.
>
> Enjoy !
>
> Etienne
>
>


Re: Perf issue with Beam on spark (spark runner)

2021-08-05 Thread Tao Li
Hi Alexey,

It was a great presentation!

Regarding my perf testing, I was not doing aggregation, filtering, projection 
or joining. I was simply reading all the fields of parquet and then immediately 
save PCollection back to parquet.

Regarding SDF translation, is it enabled by default?

I will check out ParquetIO splittable. Thanks!

From: Alexey Romanenko 
Date: Thursday, August 5, 2021 at 6:40 AM
To: Tao Li 
Cc: "user@beam.apache.org" , Andrew Pilloud 
, Ismaël Mejía , Kyle Weaver 
, Yuchu Cao 
Subject: Re: Perf issue with Beam on spark (spark runner)

It’s very likely that Spark SQL may have much better performance because of SQL 
push-downs and avoiding additional ser/deser operations.

In the same time, did you try to leverage "withProjection()” in ParquetIO and 
project only the fields that you needed?

Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])?

Also, using SDF translation for Read on Spark Runner can cause performance 
degradation as well (we noticed that in our experiments). Try to use non-SDF 
read (if not yet) [2]


PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m not 
sure if a recording is already available but you can find the slides here [3] 
that can be helpful.


—
Alexey

[1] 
https://issues.apache.org/jira/browse/BEAM-12070<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=Yq%2FODFNPo7XncHKExNDRBw6qRH2HSrymTcSGGRRWICs%3D=0>
[2] 
https://issues.apache.org/jira/browse/BEAM-10670<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=ABQA4rB%2BeiMHIGdXQKiADS93F9%2F3bUfn4%2BCRRr4dgVI%3D=0>
[3] 
https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=%2Fj0Qeibje5jk0Hiz9x57Pa92mRTyzvmTf63hOrNCPZ4%3D=0>



On 5 Aug 2021, at 03:07, Tao Li mailto:t...@zillow.com>> wrote:

@Alexey Romanenko<mailto:aromanenko@gmail.com> @Ismaël 
Mejía<mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can 
you please take a look at this thread and confirm this jira covers the causes 
https://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=c23T9dKc0muC7sRWrsAYrewA4QKAUSc6tOAwe9kRfC4%3D=0>
 ?

This perf issue is currently a blocker to me..

Thanks so much!

From: Tao Li mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>
Date: Friday, July 30, 2021 at 3:53 PM
To: Andrew Pilloud mailto:apill...@google.com>>, 
"user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>
Cc: Kyle Weaver mailto:kcwea...@google.com>>, Yuchu Cao 
mailto:yuc...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

Thanks everyone for your help.

We actually did another round of perf comparison between Beam (on spark) and 
native spark, without any projection/filtering in the query (to rule out the 
“predicate pushdown” factor).

The time spent on Beam with spark runner is still taking 3-5x period of time 
compared with native spark, and the cause 
ishttps://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001702736%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=LXb2NFUuF3BKkUX6m6rAdMJ%2B04e8WjxPNcDVn4zibl8%3D=0>
 according to the spark metrics. Spark runner is pretty much the bottleneck.



From: Andrew Pilloud mailto:apill...@google.com>>
Date: Thursday, July 29,

Re: Perf issue with Beam on spark (spark runner)

2021-08-05 Thread Alexey Romanenko
It’s very likely that Spark SQL may have much better performance because of SQL 
push-downs and avoiding additional ser/deser operations.

In the same time, did you try to leverage "withProjection()” in ParquetIO and 
project only the fields that you needed? 

Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])?

Also, using SDF translation for Read on Spark Runner can cause performance 
degradation as well (we noticed that in our experiments). Try to use non-SDF 
read (if not yet) [2]


PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m not 
sure if a recording is already available but you can find the slides here [3] 
that can be helpful.


—
Alexey

[1] https://issues.apache.org/jira/browse/BEAM-12070
[2] https://issues.apache.org/jira/browse/BEAM-10670
[3] 
https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing
 

> On 5 Aug 2021, at 03:07, Tao Li  wrote:
> 
> @Alexey Romanenko <mailto:aromanenko@gmail.com> @Ismaël Mejía 
> <mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can you 
> please take a look at this thread and confirm this jira covers the causes 
> https://issues.apache.org/jira/browse/BEAM-12646 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081708037%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=UCxzySGVB8H%2B2tOmjDVN5FqeSVxarmD5c1gg3Xa4RKA%3D=0>
>  ?
>  
> This perf issue is currently a blocker to me..
>  
> Thanks so much!
>  
> From: Tao Li mailto:t...@zillow.com>>
> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Date: Friday, July 30, 2021 at 3:53 PM
> To: Andrew Pilloud mailto:apill...@google.com>>, 
> "user@beam.apache.org <mailto:user@beam.apache.org>"  <mailto:user@beam.apache.org>>
> Cc: Kyle Weaver mailto:kcwea...@google.com>>, Yuchu Cao 
> mailto:yuc...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
>  
> Thanks everyone for your help.
>  
> We actually did another round of perf comparison between Beam (on spark) and 
> native spark, without any projection/filtering in the query (to rule out the 
> “predicate pushdown” factor).
>  
> The time spent on Beam with spark runner is still taking 3-5x period of time 
> compared with native spark, and the cause 
> ishttps://issues.apache.org/jira/browse/BEAM-12646 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081708037%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=UCxzySGVB8H%2B2tOmjDVN5FqeSVxarmD5c1gg3Xa4RKA%3D=0>
>  according to the spark metrics. Spark runner is pretty much the bottleneck.
>  
> 
>  
> From: Andrew Pilloud mailto:apill...@google.com>>
> Date: Thursday, July 29, 2021 at 2:11 PM
> To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
> mailto:user@beam.apache.org>>
> Cc: Tao Li mailto:t...@zillow.com>>, Kyle Weaver 
> mailto:kcwea...@google.com>>, Yuchu Cao 
> mailto:yuc...@trulia.com>>
> Subject: Re: Perf issue with Beam on spark (spark runner)
>  
> Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0.
>  
> Andrew
>  
> On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud  <mailto:apill...@google.com>> wrote:
>> Beam SQL doesn't currently have project pushdown for ParquetIO (we are 
>> working to expand this to more IOs). Using ParquetIO withProjection directly 
>> will produce better results.
>>  
>> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw > <mailto:rober...@google.com>> wrote:
>>> Could you try using Beam SQL [1] and see if that gives more similar result 
>>> to your Spark SQL query? I would also be curious if the performance is 
>>> sufficient using withProjection to only read the auction, price, and bidder 
>>> columns. 
>>>  
>>> [1] https://beam.apache.org/documentation/dsls/sql/overview/ 
>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081698082%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C10

Spark Structured Streaming runner migrated to Spark 3

2021-08-05 Thread Etienne Chauchot

Hi all,

Just to let you know that Spark Structured Streaming runner was migrated 
to Spark 3.


Enjoy !

Etienne



Re: Spark Structured Streaming Runner Roadmap

2021-08-03 Thread Etienne Chauchot

Hi,

Sorry for the late answer: the streaming mode in spark structured 
streaming runner is stuck because of spark structured streaming 
framework implementation of watermark at the apache spark project side. See


https://echauchot.blogspot.com/2020/11/watermark-architecture-proposal-for.html

best

Etienne

On 20/05/2021 20:37, Yu Zhang wrote:

Hi Beam Community,

Would there be any roadmap for Spark Structured Runner to support streaming and 
Splittable DoFn API? Like the specific timeline or release version.

Thanks,
Yu


example on running python apache beam with standalone spark

2021-07-28 Thread cw
I start the Spark docker with this commands on the host machine
curl -LO 
https://raw.githubusercontent.com/bitnami/bitnami-docker-spark/master/docker-compose.yml
# edit and expose port 7077 to host machine
$ docker-compose up


then, i run docker run --net=host apache/beam_spark_job_server:latest 
--spark-master-url=spark://localhost:7077 on the host machine

lastly, i run this command on the host machine python -m 
apache_beam.examples.wordcount --input ./a_file_input \
 --output ./counts \
 --runner=PortableRunner 
--job_endpoint=localhost:8099 --environment_type=LOOPBACK


I get below errors from the beam_spark_job_server docker container

21/07/28 13:41:56 INFO org.apache.beam.runners.spark.SparkJobInvoker: Invoking 
job BeamApp-root-0728134156-a85a64c5_019bd04e-aabc-4deb-8b12-8c61f173cd5b
21/07/28 13:41:56 INFO org.apache.beam.runners.jobsubmission.JobInvocation: 
Starting job invocation 
BeamApp-root-0728134156-a85a64c5_019bd04e-aabc-4deb-8b12-8c61f173cd5b
21/07/28 13:41:56 INFO 
org.apache.beam.runners.core.construction.resources.PipelineResources: 
PipelineOptions.filesToStage was not specified. Defaulting to files from the 
classpath: will stage 6 files. Enable logging at DEBUG level to see which files 
will be staged.
21/07/28 13:41:57 INFO 
org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a brand 
new Spark Context.
21/07/28 13:41:57 WARN org.apache.spark.util.Utils: Your hostname, cometstrike 
resolves to a loopback address: 127.0.1.1; using 192.168. instead (on 
interface wlo1)
21/07/28 13:41:57 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you 
need to bind to another address
21/07/28 13:41:57 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
21/07/28 13:42:57 ERROR 
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application has 
been killed. Reason: All masters are unresponsive! Giving up.
21/07/28 13:42:57 WARN 
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application ID 
is not initialized yet.
21/07/28 13:42:57 WARN 
org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint: Drop 
UnregisterApplication(null) because has not yet connected to master
21/07/28 13:42:57 ERROR org.apache.spark.SparkContext: Error initializing 
SparkContext.
java.lang.NullPointerException
    at org.apache.spark.SparkContext.(SparkContext.scala:560)
    at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
    at 
org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:101)
    at 
org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:67)
    at 
org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:118)
    at 
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/07/28 13:42:57 ERROR org.apache.spark.scheduler.AsyncEventQueue: Listener 
AppStatusListener threw an exception
java.lang.NullPointerException
    at 
org.apache.spark.status.AppStatusListener.onApplicationEnd(AppStatusListener.scala:167)
    at 
org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:57)
    at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
    at 
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
    at 
org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
    at 
org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
    at 
org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58

Re: spark-submit and the portable runner

2021-06-24 Thread Trevor Kramer
Thanks. That was the issue. The latest release of Beam indicates it
supports Spark 3 and I find references in the code to Hadoop 3.2.1. Is it
possible to configure beam to run on Hadoop 3.2.1?

Trevor

On Mon, Jun 21, 2021 at 6:19 PM Kyle Weaver  wrote:

> Looks like a version mismatch between Hadoop dependencies [1]. Beam's
> Python wrapper for Spark is currently pinned to Spark 2.4.8 [2]. Which
> Spark and Hadoop versions are your cluster using?
>
> [1]
> https://stackoverflow.com/questions/62880009/error-through-remote-spark-job-java-lang-illegalaccesserror-class-org-apache-h
> [2] https://issues.apache.org/jira/browse/BEAM-12094
>
> On Mon, Jun 21, 2021 at 9:52 AM Trevor Kramer 
> wrote:
>
>> Does anyone have an example of using spark-submit to run a beam job using
>> the portable runner? The documentation indicates this is possible but
>> doesn't give an example of how to do it.
>>
>> I am using the following pipeline options to generate the jar
>>
>> options = PipelineOptions(['--runner=SparkRunner',
>>'--environment_type=DOCKER',
>>'--environment_config=path-to-image:latest',
>>'--output_executable_path=output.jar'
>>])
>>
>> and am trying to run it with
>>
>> spark-submit --master yarn --deploy-mode cluster --class
>> org.apache.beam.runners.spark.SparkPipelineRunner output.jar
>>
>>
>> but I get the following error
>>
>>
>> Exception in thread "main" java.lang.IllegalAccessError: class
>> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
>> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:348)
>>
>> at
>> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>>
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>>
>> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>>
>> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>>
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)
>>
>> at
>> org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)
>>
>> at scala.Option.getOrElse(Option.scala:189)
>>
>> at org.apache.spark.deploy.yarn.Client.(Client.scala:138)
>>
>> at
>> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)
>>
>> at org.apache.spark.deploy.SparkSubmit.org
>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
>>
>> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>>
>> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>>
>> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
>>
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
>>
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>


spark-submit and the portable runner

2021-06-21 Thread Trevor Kramer
Does anyone have an example of using spark-submit to run a beam job using
the portable runner? The documentation indicates this is possible but
doesn't give an example of how to do it.

I am using the following pipeline options to generate the jar

options = PipelineOptions(['--runner=SparkRunner',
   '--environment_type=DOCKER',
   '--environment_config=path-to-image:latest',
   '--output_executable_path=output.jar'
   ])

and am trying to run it with

spark-submit --master yarn --deploy-mode cluster --class
org.apache.beam.runners.spark.SparkPipelineRunner output.jar


but I get the following error


Exception in thread "main" java.lang.IllegalAccessError: class
org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator

at java.lang.ClassLoader.defineClass1(Native Method)

at java.lang.ClassLoader.defineClass(ClassLoader.java:757)

at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)

at java.net.URLClassLoader.access$100(URLClassLoader.java:74)

at java.net.URLClassLoader$1.run(URLClassLoader.java:369)

at java.net.URLClassLoader$1.run(URLClassLoader.java:363)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:362)

at java.lang.ClassLoader.loadClass(ClassLoader.java:419)

at java.lang.ClassLoader.loadClass(ClassLoader.java:352)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:348)

at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)

at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)

at java.util.ServiceLoader$1.next(ServiceLoader.java:480)

at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)

at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)

at
org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.deploy.yarn.Client.(Client.scala:138)

at
org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)

at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)

at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)

at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)

at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)

at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Re: How to specify a spark config with Beam spark runner

2021-06-17 Thread Tao Li
Hi Alexey,

Thanks we will give it a try.

From: Alexey Romanenko 
Reply-To: "user@beam.apache.org" 
Date: Thursday, June 10, 2021 at 5:14 AM
To: "user@beam.apache.org" 
Subject: Re: How to specify a spark config with Beam spark runner

Hi Tao,

"Limited spark options”, that you mentioned, are Beam's application arguments 
and if you run your job via "spark-submit" you should still be able to 
configure Spark application via normal spark-submit “--conf key=value” CLI 
option.
Doesn’t it work for you?

—
Alexey


On 10 Jun 2021, at 01:29, Tao Li mailto:t...@zillow.com>> 
wrote:

Hi Beam community,

We are trying to specify a spark config 
“spark.hadoop.fs.s3a.canned.acl=BucketOwnerFullControl” in the spark-submit 
command for a beam app. I only see limited spark options supported according to 
this doc: 
https://beam.apache.org/documentation/runners/spark/<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Frunners%2Fspark%2F=04%7C01%7Ctaol%40zillow.com%7Ca97c13493d904a366c0308d92c094b53%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637589240693166504%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=l%2BWgkIYG%2BVM9j7z0PXMKj1ybNL51E%2F2%2BmTUgD4dkeuc%3D=0>

How can we specify an arbitrary spark config? Please advise. Thanks!



Re: How to specify a spark config with Beam spark runner

2021-06-10 Thread Alexey Romanenko
Hi Tao,

"Limited spark options”, that you mentioned, are Beam's application arguments 
and if you run your job via "spark-submit" you should still be able to 
configure Spark application via normal spark-submit “--conf key=value” CLI 
option. 
Doesn’t it work for you?

—
Alexey

> On 10 Jun 2021, at 01:29, Tao Li  wrote:
> 
> Hi Beam community,
>  
> We are trying to specify a spark config 
> “spark.hadoop.fs.s3a.canned.acl=BucketOwnerFullControl” in the spark-submit 
> command for a beam app. I only see limited spark options supported according 
> to this doc: https://beam.apache.org/documentation/runners/spark/ 
> <https://beam.apache.org/documentation/runners/spark/>
>  
> How can we specify an arbitrary spark config? Please advise. Thanks!



How to specify a spark config with Beam spark runner

2021-06-09 Thread Tao Li
Hi Beam community,

We are trying to specify a spark config 
“spark.hadoop.fs.s3a.canned.acl=BucketOwnerFullControl” in the spark-submit 
command for a beam app. I only see limited spark options supported according to 
this doc: https://beam.apache.org/documentation/runners/spark/

How can we specify an arbitrary spark config? Please advise. Thanks!




Re: JdbcIO parallel read on spark

2021-05-27 Thread Thomas Fredriksen(External)
Quick update.

After some testing, we have noticed that the splittable JdbcIO-poc works
well when the number of splits does not exceed the number of spark tasks.
In cases where the number of splits do exceed the task count, the pipeline
freezes after each worker has processed a single split each.

In our case, we are attempting to read 895676 rows, with 1 row splits.
We are running this on a single Spark worker for testing purposes, and the
stage is split into 8 tasks. The pipeline freezes after 8 splits have been
processed. We are able to verify that the @ProcessElements-function returns
without exceptions.

On the direct-runner, we noticed that setting the split size to less than
the checkpoint-size (hardcoded to 100) mitigated this problem. Any higher
value will cause the pipeline to freeze just like with the SparkRunner.

Is there anyone who can help shine a light on this?

On Wed, May 26, 2021 at 7:50 AM Thomas Fredriksen(External) <
thomas.fredrik...@cognite.com> wrote:

> There is no forking after the "Generate Queries" transform.
>
> We noticed that the "Generate Queries" transform is in a different stage
> than the reading itself. This is likely due to the Reparallelize-transform,
> and we also see this with JdbcIO.readAll.
>
> After reading up on Splittable DoFn's, we decided to give it a try. We
> essentially copied the source of JdbcIO into our project and changed
> `ReadFn` into `SplittableJdbcIO` which acts as (more or less) a drop-in
> replacement (see source below).
>
> The DAG here is seemingly simpler with a single stage containing all steps
> from reading the DB to writing. We are also seeing that the job is
> parallelizing much better than before.
>
> class SplittableJdbcIO {
>> /* ... */
>> @ProcessElement
>> public void processElement(@Element ParameterT element,
>>RestrictionTracker
>> tracker,
>>OutputReceiver out) throws
>> Exception {
>> if (connection == null) {
>> connection = dataSource.getConnection();
>> }
>>
>> if
>> (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>> LOG.error("Failed to claim restriction");
>> ProcessContinuation.stop();
>> }
>>
>> LOG.info("Preparing query. fetchSize={}, shardSize={},
>> from={}, to={}", fetchSize, shardSize,
>> tracker.currentRestriction().getFrom(),
>> tracker.currentRestriction().getTo());
>>
>> String executeQuery = String.format("SELECT * FROM (%s) t
>> OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;", query.get());
>>
>> // PostgreSQL requires autocommit to be disabled to enable
>> cursor streaming
>> // see
>> https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
>> LOG.debug("Autocommit has been disabled");
>> connection.setAutoCommit(false);
>> try (PreparedStatement statement =
>> connection.prepareStatement(executeQuery, ResultSet.TYPE_FORWARD_ONLY,
>> ResultSet.CONCUR_READ_ONLY)) {
>> statement.setFetchSize(fetchSize);
>> parameterSetter.setParameters(element, statement);
>>
>>
>> statement.setLong(statement.getParameterMetaData().getParameterCount() - 1,
>> tracker.currentRestriction().getFrom());
>>
>> statement.setLong(statement.getParameterMetaData().getParameterCount(),
>> tracker.currentRestriction().getTo() -
>> tracker.currentRestriction().getFrom());
>>
>> queryCounter.inc();
>>
>> int count = 0;
>> long t0 = Instant.now().getMillis();
>>
>> try (ResultSet resultSet = statement.executeQuery()) {
>> queryLatency.update(Instant.now().getMillis() - t0);
>> LOG.info("Query took {} ms",
>> Instant.now().getMillis() - t0);
>>
>> while (resultSet.next()) {
>> out.output(rowMapper.mapRow(resultSet));
>>
>> rowCounter.inc();
>> count++;
>> }
>> LOG.info("Fetched {} rows", count);
>>
>> }
>> }
>> }
>>
>> @SplitRestriction
>> public void splitRestriction(@Element ParameterT element,
>> 

Re: JdbcIO parallel read on spark

2021-05-25 Thread Thomas Fredriksen(External)
 try (ResultSet resultSet = statement.executeQuery()) {
> resultSet.next();
>
> long result = resultSet.getLong(1);
> LOG.info("The query results in {} rows", result);
> return new OffsetRange(0, result);
> }
> }
> }
> }
> }
>



On Tue, May 25, 2021 at 6:35 PM Alexey Romanenko 
wrote:

> Hi,
>
> Did you check a Spark DAG if it doesn’t fork branches after "Genereate
> queries” transform?
>
> —
> Alexey
>
> On 24 May 2021, at 20:32, Thomas Fredriksen(External) <
> thomas.fredrik...@cognite.com> wrote:
>
> Hi there,
>
> We are struggling to get the JdbcIO-connector to read a large table on
> spark.
>
> In short - we wish to read a large table (several billion rows), transform
> then write the transformed data to a new table.
>
> We are aware that `JdbcIO.read()` does not parallelize. In order to solve
> this, we attempted to create ranges then generate `limit/offset` queries
> and use `JdbcIO.readAll()` instead.
>
> The overall steps look something like this (sanitized for readability):
>
> ```
> pipeline
>   .apply("Read row count", JdbcIo.read()
> .withQuery("select count(*) from MYTABLE;")
> .withCoder(VarLongCoder.of())
> .withOtherOptions(...))
>   .apply("Genereate queries", ParDo.of(new DoFn() {...}) //
> Outputs table offsets
>   .apply("Read results", JdbcIO.readAll()
> .withCoder(SchemaCoder.of(...))
> .withOutputParallelization(false)
> .withQuery("select * from MYTABLE offset ? limit MYLIMIT;")
> .withParameterSetter((element, statement) -> statement.setLong(1,
> element))
> .withOtherOptions(...))
>   .apply("more steps", ...);
> ```
>
> The problem is that this does not seem to parallelize on the spark runner.
> Only a single worker seem to be doing all the work.
>
> We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`,
> however this did not seem to make a difference.
>
> Our goal is to avoid all data from the query be cached in memory between
> the read and transform operations. This causes OOM-exceptions. Having a
> single worker reading the database is okay as long as other workers can
> process the data as soon as it is read and not having to wait for all the
> data to be ready.
>
> Any advice on how we approach this.
>
> Best Regards
> Thomas Li Fredriksen
>
>
>


Re: JdbcIO parallel read on spark

2021-05-25 Thread Alexey Romanenko
Hi,

Did you check a Spark DAG if it doesn’t fork branches after "Genereate queries” 
transform?

—
Alexey

> On 24 May 2021, at 20:32, Thomas Fredriksen(External) 
>  wrote:
> 
> Hi there,
> 
> We are struggling to get the JdbcIO-connector to read a large table on spark.
> 
> In short - we wish to read a large table (several billion rows), transform 
> then write the transformed data to a new table.
> 
> We are aware that `JdbcIO.read()` does not parallelize. In order to solve 
> this, we attempted to create ranges then generate `limit/offset` queries and 
> use `JdbcIO.readAll()` instead.
> 
> The overall steps look something like this (sanitized for readability):
> 
> ```
> pipeline
>   .apply("Read row count", JdbcIo.read()
> .withQuery("select count(*) from MYTABLE;")
> .withCoder(VarLongCoder.of())
> .withOtherOptions(...))
>   .apply("Genereate queries", ParDo.of(new DoFn() {...}) // 
> Outputs table offsets
>   .apply("Read results", JdbcIO.readAll()
> .withCoder(SchemaCoder.of(...))
> .withOutputParallelization(false)
> .withQuery("select * from MYTABLE offset ? limit MYLIMIT;")
> .withParameterSetter((element, statement) -> statement.setLong(1, 
> element))
> .withOtherOptions(...))
>   .apply("more steps", ...);
> ```
> 
> The problem is that this does not seem to parallelize on the spark runner. 
> Only a single worker seem to be doing all the work.
> 
> We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`, 
> however this did not seem to make a difference.
> 
> Our goal is to avoid all data from the query be cached in memory between the 
> read and transform operations. This causes OOM-exceptions. Having a single 
> worker reading the database is okay as long as other workers can process the 
> data as soon as it is read and not having to wait for all the data to be 
> ready.
> 
> Any advice on how we approach this.
> 
> Best Regards
> Thomas Li Fredriksen



JdbcIO parallel read on spark

2021-05-24 Thread Thomas Fredriksen(External)
Hi there,

We are struggling to get the JdbcIO-connector to read a large table on
spark.

In short - we wish to read a large table (several billion rows), transform
then write the transformed data to a new table.

We are aware that `JdbcIO.read()` does not parallelize. In order to solve
this, we attempted to create ranges then generate `limit/offset` queries
and use `JdbcIO.readAll()` instead.

The overall steps look something like this (sanitized for readability):

```
pipeline
  .apply("Read row count", JdbcIo.read()
.withQuery("select count(*) from MYTABLE;")
.withCoder(VarLongCoder.of())
.withOtherOptions(...))
  .apply("Genereate queries", ParDo.of(new DoFn() {...}) //
Outputs table offsets
  .apply("Read results", JdbcIO.readAll()
.withCoder(SchemaCoder.of(...))
.withOutputParallelization(false)
.withQuery("select * from MYTABLE offset ? limit MYLIMIT;")
.withParameterSetter((element, statement) -> statement.setLong(1,
element))
.withOtherOptions(...))
  .apply("more steps", ...);
```

The problem is that this does not seem to parallelize on the spark runner.
Only a single worker seem to be doing all the work.

We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`,
however this did not seem to make a difference.

Our goal is to avoid all data from the query be cached in memory between
the read and transform operations. This causes OOM-exceptions. Having a
single worker reading the database is okay as long as other workers can
process the data as soon as it is read and not having to wait for all the
data to be ready.

Any advice on how we approach this.

Best Regards
Thomas Li Fredriksen


Spark Structured Streaming Runner Roadmap

2021-05-20 Thread Yu Zhang
Hi Beam Community, 

Would there be any roadmap for Spark Structured Runner to support streaming and 
Splittable DoFn API? Like the specific timeline or release version. 

Thanks, 
Yu

Re: Does SnowflakeIO support spark runner

2021-05-06 Thread Tao Li
Thanks Kyle!

From: Kyle Weaver 
Date: Thursday, May 6, 2021 at 12:19 PM
To: Tao Li 
Cc: "user@beam.apache.org" , Anuj Gandhi 

Subject: Re: Does SnowflakeIO support spark runner

Yeah, I'm pretty sure that documentation is just misleading. All of the options 
from --runner onward are runner-specific and don't have anything to do with 
Snowflake, so they should probably be removed from the doc.

On Thu, May 6, 2021 at 12:06 PM Tao Li 
mailto:t...@zillow.com>> wrote:
Hi @Kyle Weaver<mailto:kcwea...@google.com>

According to this 
doc<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fio%2Fbuilt-in%2Fsnowflake%2F=04%7C01%7Ctaol%40zillow.com%7C7be88d03486d4f64c1db08d910c3e67a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637559255833746635%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=gnk80TpLC%2BMlclZ%2F5PKFTFc5H2GoQtL2GUOYZvzRj1g%3D=0>:
 --runner=

From: Kyle Weaver mailto:kcwea...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>
Date: Thursday, May 6, 2021 at 12:01 PM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
mailto:user@beam.apache.org>>
Cc: Anuj Gandhi mailto:an...@zillowgroup.com>>
Subject: Re: Does SnowflakeIO support spark runner

As far as I know, it should be supported (Beam's abstract model means IOs 
usually "just work" on all runners). What makes you think it isn't supported?

On Thu, May 6, 2021 at 11:52 AM Tao Li 
mailto:t...@zillow.com>> wrote:
Hi Beam community,

Does SnowflakeIO support spark runner? Seems like only direct runner and 
dataflow runner are supported..

Thanks!


Re: Does SnowflakeIO support spark runner

2021-05-06 Thread Kyle Weaver
Yeah, I'm pretty sure that documentation is just misleading. All of the
options from --runner onward are runner-specific and don't have anything to
do with Snowflake, so they should probably be removed from the doc.

On Thu, May 6, 2021 at 12:06 PM Tao Li  wrote:

> Hi @Kyle Weaver 
>
>
>
> According to this doc
> <https://beam.apache.org/documentation/io/built-in/snowflake/>:
> --runner=
>
>
>
> *From: *Kyle Weaver 
> *Reply-To: *"user@beam.apache.org" 
> *Date: *Thursday, May 6, 2021 at 12:01 PM
> *To: *"user@beam.apache.org" 
> *Cc: *Anuj Gandhi 
> *Subject: *Re: Does SnowflakeIO support spark runner
>
>
>
> As far as I know, it should be supported (Beam's abstract model means IOs
> usually "just work" on all runners). What makes you think it isn't
> supported?
>
>
>
> On Thu, May 6, 2021 at 11:52 AM Tao Li  wrote:
>
> Hi Beam community,
>
>
>
> Does SnowflakeIO support spark runner? Seems like only direct runner and
> dataflow runner are supported..
>
>
>
> Thanks!
>
>


Re: Does SnowflakeIO support spark runner

2021-05-06 Thread Tao Li
Hi @Kyle Weaver<mailto:kcwea...@google.com>

According to this 
doc<https://beam.apache.org/documentation/io/built-in/snowflake/>: 
--runner=

From: Kyle Weaver 
Reply-To: "user@beam.apache.org" 
Date: Thursday, May 6, 2021 at 12:01 PM
To: "user@beam.apache.org" 
Cc: Anuj Gandhi 
Subject: Re: Does SnowflakeIO support spark runner

As far as I know, it should be supported (Beam's abstract model means IOs 
usually "just work" on all runners). What makes you think it isn't supported?

On Thu, May 6, 2021 at 11:52 AM Tao Li 
mailto:t...@zillow.com>> wrote:
Hi Beam community,

Does SnowflakeIO support spark runner? Seems like only direct runner and 
dataflow runner are supported..

Thanks!


Re: Does SnowflakeIO support spark runner

2021-05-06 Thread Kyle Weaver
As far as I know, it should be supported (Beam's abstract model means IOs
usually "just work" on all runners). What makes you think it isn't
supported?

On Thu, May 6, 2021 at 11:52 AM Tao Li  wrote:

> Hi Beam community,
>
>
>
> Does SnowflakeIO support spark runner? Seems like only direct runner and
> dataflow runner are supported..
>
>
>
> Thanks!
>


Does SnowflakeIO support spark runner

2021-05-06 Thread Tao Li
Hi Beam community,

Does SnowflakeIO support spark runner? Seems like only direct runner and 
dataflow runner are supported..

Thanks!


Re: [Question] Docker and File Errors with Spark PortableRunner

2021-04-02 Thread Kyle Weaver
Hi Michael,

Your problems in 1. and 2. are related to the artifact staging workflow,
where Beam tries to copy your pipeline’s dependencies to the workers. When
artifacts cannot be fetched because of file system or other issues, the
workers cannot be started successfully. In this case, your pipeline depends
on the pickled main session from estimate_pi.py [1].

In order for artifact staging to work, the job server’s --artifacts-dir
must be accessible by the Spark worker.* Since you start your job server in
a Docker container, /users/mkuchnik/staging is hidden inside that Docker
container’s filesystem, which is not accessible from your network
filesystem. You mentioned in 2. that you tried mounting the directory to
the [worker (?)] containers, but have you tried mounting that directory to
the job server container?

Thanks,
Kyle

* It looks like this is unclear in current documentation, so I will edit it.

[1]
https://github.com/apache/beam/blob/2c619c81082839e054f16efee9311b9f74b6e436/sdks/python/apache_beam/examples/complete/estimate_pi.py#L118


Re: Is there a perf comparison between Beam (on spark) and native Spark?

2021-03-25 Thread Tao Li
Thanks @Alexey Romanenko<mailto:aromanenko@gmail.com> for this info. Do we 
have a rough idea how Beam (on spark) compares with native Spark by using TPCDS 
or any benchmarks? I am just wondering if run Beam sql with Spark runner will 
have a similar processing time compared with Spark sql. Thanks!

From: Alexey Romanenko 
Reply-To: "user@beam.apache.org" 
Date: Tuesday, March 23, 2021 at 12:58 PM
To: "user@beam.apache.org" 
Subject: Re: Is there a perf comparison between Beam (on spark) and native 
Spark?

There is an extension in Beam to support TPC-DS benchmark [1] that basically 
runs TPC-DS SQL queries via Beam SQL. Though, I’m not sure if it runs regularly 
and, IIRC (when I took a look on this last time, maybe I’m mistaken), it 
requires some adjustments to run on any other runners than Dataflow. Also, when 
I tried to run it on SparkRunner many queries failed because of different 
reasons [2].

I believe that if we will manage to make it running for most of the queries on 
any runner then it will be a good addition to Nexmark benchmark that we have 
for now since TPC-DS results can be used to compare with other data processing 
systems as well.

[1] 
https://github.com/apache/beam/tree/master/sdks/java/testing/tpcds<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Ftree%2Fmaster%2Fsdks%2Fjava%2Ftesting%2Ftpcds=04%7C01%7Ctaol%40zillow.com%7C3a7b26c3aead4633412408d8ee361603%7C033464830d1840e7a5883784ac50e16f%7C0%7C1%7C637521263368804132%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=4Tjd1BcEHRJQUsH9DK1ASVM496nNaqZGetFD4%2F46B7k%3D=0>
[2] 
https://issues.apache.org/jira/browse/BEAM-9891<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-9891=04%7C01%7Ctaol%40zillow.com%7C3a7b26c3aead4633412408d8ee361603%7C033464830d1840e7a5883784ac50e16f%7C0%7C1%7C637521263368804132%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=ibmzJ3cPSHzDjVPBR4A5jTQTs2O2obmh%2FDQG2X3UBSg%3D=0>


On 22 Mar 2021, at 18:00, Tao Li mailto:t...@zillow.com>> 
wrote:

Hi Beam community,

I am wondering if there is a doc to compare perf of Beam (on Spark) and native 
spark for batch processing? For example using TPCDS benmark.

I did find some relevant links like 
this<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Farchive.fosdem.org%2F2018%2Fschedule%2Fevent%2Fnexmark_benchmarking_suite%2Fattachments%2Fslides%2F2494%2Fexport%2Fevents%2Fattachments%2Fnexmark_benchmarking_suite%2Fslides%2F2494%2FNexmark_Suite_for_Apache_Beam_(FOSDEM18).pdf=04%7C01%7Ctaol%40zillow.com%7C3a7b26c3aead4633412408d8ee361603%7C033464830d1840e7a5883784ac50e16f%7C0%7C1%7C637521263368814090%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=4Dk5m6rlS8MLhHhiCY42bbGM3qZ2tzRQVxihL1TnL%2BU%3D=0>
 but it’s old and it mostly covers the streaming scenarios.

Thanks!



Re: Is there a perf comparison between Beam (on spark) and native Spark?

2021-03-23 Thread Alexey Romanenko
There is an extension in Beam to support TPC-DS benchmark [1] that basically 
runs TPC-DS SQL queries via Beam SQL. Though, I’m not sure if it runs regularly 
and, IIRC (when I took a look on this last time, maybe I’m mistaken), it 
requires some adjustments to run on any other runners than Dataflow. Also, when 
I tried to run it on SparkRunner many queries failed because of different 
reasons [2].

I believe that if we will manage to make it running for most of the queries on 
any runner then it will be a good addition to Nexmark benchmark that we have 
for now since TPC-DS results can be used to compare with other data processing 
systems as well.

[1] https://github.com/apache/beam/tree/master/sdks/java/testing/tpcds
[2] https://issues.apache.org/jira/browse/BEAM-9891

> On 22 Mar 2021, at 18:00, Tao Li  wrote:
> 
> Hi Beam community,
>  
> I am wondering if there is a doc to compare perf of Beam (on Spark) and 
> native spark for batch processing? For example using TPCDS benmark.
>  
> I did find some relevant links like this 
> <https://archive.fosdem.org/2018/schedule/event/nexmark_benchmarking_suite/attachments/slides/2494/export/events/attachments/nexmark_benchmarking_suite/slides/2494/Nexmark_Suite_for_Apache_Beam_(FOSDEM18).pdf>
>  but it’s old and it mostly covers the streaming scenarios.
>  
> Thanks!



Re: Is there a perf comparison between Beam (on spark) and native Spark?

2021-03-22 Thread Boyuan Zhang
+Kyle Weaver 
Kyle, do you happen to have some information here?

On Mon, Mar 22, 2021 at 10:00 AM Tao Li  wrote:

> Hi Beam community,
>
>
>
> I am wondering if there is a doc to compare perf of Beam (on Spark) and
> native spark for batch processing? For example using TPCDS benmark.
>
>
>
> I did find some relevant links like this
> <https://archive.fosdem.org/2018/schedule/event/nexmark_benchmarking_suite/attachments/slides/2494/export/events/attachments/nexmark_benchmarking_suite/slides/2494/Nexmark_Suite_for_Apache_Beam_(FOSDEM18).pdf>
> but it’s old and it mostly covers the streaming scenarios.
>
>
>
> Thanks!
>


Is there a perf comparison between Beam (on spark) and native Spark?

2021-03-22 Thread Tao Li
Hi Beam community,

I am wondering if there is a doc to compare perf of Beam (on Spark) and native 
spark for batch processing? For example using TPCDS benmark.

I did find some relevant links like 
this<https://archive.fosdem.org/2018/schedule/event/nexmark_benchmarking_suite/attachments/slides/2494/export/events/attachments/nexmark_benchmarking_suite/slides/2494/Nexmark_Suite_for_Apache_Beam_(FOSDEM18).pdf>
 but it’s old and it mostly covers the streaming scenarios.

Thanks!


Re: NotSerializableException in Spark runner

2020-12-01 Thread Ajit Dongre
Yes Alexey, com.walmart.dataplatform.aorta.river.meta.Payload class is 
serializable.

Strange observation is overall spark job is successful but in executor logs we 
continuously observed this exception.  As code is same across all nodes, how 
same code is working fine in one node and failing on another node with 
NotSerializableException.

Regards,
Ajit Dongre

From: Alexey Romanenko 
Reply to: "user@beam.apache.org" 
Date: Tuesday, 1 December 2020 at 11:35 PM
To: "user@beam.apache.org" 
Subject: EXT: Re: NotSerializableException in Spark runner

Could you make sure that the instance of 
com.walmart.dataplatform.aorta.river.meta.Payload, created from a failed 
record, is serializable?


On 1 Dec 2020, at 06:06, Ajit Dongre 
mailto:ajit.don...@walmartlabs.com>> wrote:

Hi all,

I have pipeline to read data from kafka & write to file.  I am using Beam 2.12 
with spark runner in java.  While executing I am getting below exception :

org.apache.spark.network.client.ChunkFetchFailureException: Failure while 
fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: 
java.io.NotSerializableException: 
org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
Serialization stack:
- object not serializable (class: 
org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: 
TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85,
 timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, 
index=0}})
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, 
(Tag:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85,
 timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, 
index=0}}))
at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
at 
org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193)
at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608)
at 
org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
at 
org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
at scala.Option.map(Option.scala:146)
at 
org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583)
at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at 
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
at 
org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
at 
org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
org.apache.spark.network.util.

Re: NotSerializableException in Spark runner

2020-12-01 Thread Alexey Romanenko
Could you make sure that the instance of 
com.walmart.dataplatform.aorta.river.meta.Payload, created from a failed 
record, is serializable?

> On 1 Dec 2020, at 06:06, Ajit Dongre  wrote:
> 
> Hi all,
>  
> I have pipeline to read data from kafka & write to file.  I am using Beam 
> 2.12 with spark runner in java.  While executing I am getting below exception 
> :
>  
> org.apache.spark.network.client.ChunkFetchFailureException: Failure while 
> fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: 
> java.io.NotSerializableException: 
> org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
> Serialization stack:
> - object not serializable (class: 
> org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: 
> TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85,
>  timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, 
> timing=EARLY, index=0}})
> - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
> - object (class scala.Tuple2, 
> (Tag:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85,
>  timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, 
> timing=EARLY, index=0}}))
> at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
> at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
> at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
> at 
> org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193)
> at 
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
> at 
> org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
> at scala.Option.map(Option.scala:146)
> at 
> org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583)
> at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377)
> at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
> at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
> at 
> scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
> at 
> org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
> at 
> org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137)
> at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
> at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandler

NotSerializableException in Spark runner

2020-11-30 Thread Ajit Dongre
Hi all,

I have pipeline to read data from kafka & write to file.  I am using Beam 2.12 
with spark runner in java.  While executing I am getting below exception :

org.apache.spark.network.client.ChunkFetchFailureException: Failure while 
fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: 
java.io.NotSerializableException: 
org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
Serialization stack:
- object not serializable (class: 
org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: 
TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85,
 timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, 
index=0}})
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, 
(Tag:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85,
 timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, 
index=0}}))
at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
at 
org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193)
at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608)
at 
org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
at 
org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583)
at scala.Option.map(Option.scala:146)
at 
org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583)
at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at 
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
at 
org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92)
at 
org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
io.netty.channel.DefaultChannelPipeline.fireChan

Re: is apache beam go sdk supported by spark runner?

2020-11-25 Thread Robert Bradshaw
Yes, it should be for batch (just like for Python). There is ongoing
work to make it work for Streaming as well.

On Sat, Nov 21, 2020 at 2:57 PM Meriem Sara  wrote:
>
> Hello everyone. I am trying to use apache beam with Golang to execute a data 
> processing workflow using apache Spark.  However, I am confused if the go SDK 
> is supported by apache Spark. Could you please provide us wirh more 
> information ?
>
> Thank you


Re: is apache beam go sdk supported by apache Spark runner

2020-11-23 Thread Alexey Romanenko
As I can say for Spark Runner, natively it supports only Java SDK. I’m far away 
a Go SDK expert, but I think you can run a pipeline, written with Go SDK, using 
a Portable Runner and Spark Runner Job Server, like it’s possible to do for 
Python SDK pipelines. I’m not sure if it’s already officially supported but I 
believe that GoSDK-people may provide more details on this. 

Actually, I moved forward and I tried to run it on my side but with no success 
for now.

1) Run a Spark Runner Job Server:

$ docker run --net=host apache/beam_spark_job_server:latest
20/11/23 17:50:04 INFO org.apache.beam.runners.jobsubmission.JobServerDriver: 
ArtifactStagingService started on localhost:8098
20/11/23 17:50:04 INFO org.apache.beam.runners.jobsubmission.JobServerDriver: 
Java ExpansionService started on localhost:8097
20/11/23 17:50:04 INFO org.apache.beam.runners.jobsubmission.JobServerDriver: 
JobService started on localhost:8099

2) Run strinsplit example on master and I have the protobuf error:

$ go run sdks/go/examples/stringsplit/stringsplit.go --runner=universal 
--endpoint=localhost:8099

panic: proto: file "v1.proto" is already registered
See 
https://developers.google.com/protocol-buffers/docs/reference/go/faq#namespace-conflict


goroutine 1 [running]:
google.golang.org/protobuf/reflect/protoregistry.glob..func1(0x1d85000, 
0xc00039c700, 0x1d68780, 0xc0003a4430, 0xc00039c700)

/Users/aromanenko/go/src/google.golang.org/protobuf/reflect/protoregistry/registry.go:38
 +0x21f
google.golang.org/protobuf/reflect/protoregistry.(*Files).RegisterFile(0xc80520,
 0x1d87ac0, 0xc00039c700, 0x0, 0x0)

/Users/aromanenko/go/src/google.golang.org/protobuf/reflect/protoregistry/registry.go:111
 +0xb72
google.golang.org/protobuf/internal/filedesc.Builder.Build(0x0, 0x0, 
0xc000230a00, 0x12a, 0x200, 0x10001, 0x0, 0x1d6f3a0, 0xc3c450, 
0x1d79460, ...)

/Users/aromanenko/go/src/google.golang.org/protobuf/internal/filedesc/build.go:113
 +0x1aa
github.com/golang/protobuf/proto.RegisterFile(0x1c60292, 0x8, 0x23708a0, 0xe2, 
0xe2)

/Users/aromanenko/go/src/github.com/golang/protobuf/proto/registry.go:47 +0x147
github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1.init.1()

/Users/aromanenko/go/src/github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1/v1.pb.go:115
 +0x5a
exit status 2

3) 
$ go version
go version go1.15.5 darwin/amd64

I wonder if it's a known issue or it’s something wrong with my environment?


> On 22 Nov 2020, at 00:05, Meriem Sara  wrote:
> 
> Hello everyone. I am trying to use apache beam with Golang to execute a data 
> processing workflow using apache Spark.  However, I am confused if the go SDK 
> is supported by apache Spark. Could you please provide us wirh more 
> information ?
> 
> Thank you



is apache beam go sdk supported by apache Spark runner

2020-11-21 Thread Meriem Sara
Hello everyone. I am trying to use apache beam with Golang to execute a data 
processing workflow using apache Spark.  However, I am confused if the go SDK 
is supported by apache Spark. Could you please provide us wirh more information 
?

Thank you


is apache beam go sdk supported by spark runner?

2020-11-21 Thread Meriem Sara
Hello everyone. I am trying to use apache beam with Golang to execute a data 
processing workflow using apache Spark.  However, I am confused if the go SDK 
is supported by apache Spark. Could you please provide us wirh more information 
?

Thank you


Re: Spark Portable Runner + Docker

2020-10-28 Thread Ramesh Mathikumar
Hi Alex -- Please se the details you are looking for.

I am running a sample pipeline and my environment is this.

python "SaiStudy - Apache-Beam-Spark.py" --runner=PortableRunner 
--job_endpoint=192.168.99.102:8099

My Spark is running on a Docker Container and I can see that the JobService is 
running at 8099.

I am getting the following error: grpc._channel._MultiThreadedRendezvous: 
<_MultiThreadedRendezvous of RPC that terminated with: status = 
StatusCode.UNAVAILABLE details = "failed to connect to all addresses" 
debug_error_string = "{"created":"@1603539936.53600","description":"Failed 
to pick subchannel","file":"src/core/ext/filters/client_channel/client_chann 
el.cc","file_line":4090,"referenced_errors":[{"created":"@1603539936.53600","description":"failed
 to connect to all addresses","file":"src/core/ext/filters/cli 
ent_channel/lb_policy/pick_first/pick_first.cc","file_line":394,"grpc_status":14}]}"

When I curl to ip:port, I can see the following error from the docker logs Oct 
24, 2020 11:34:50 AM 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.netty.NettyServerTransport 
notifyTerminated INFO: Transport failed 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2Exception:
 Unexpected HTTP/1.x request: GET / at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:103)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.readClientPrefaceString(Http2ConnectionHandler.java:302)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:239)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProt
 ection(ByteToMessageDecoder.java:505) at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:37
 4) at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044
 ) at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 at 
org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 at java.lang.Thread.run(Thread.java:748)

Help Please.

On 2020/10/28 15:37:22, Alexey Romanenko  wrote: 
> Hi Ramesh,
> 
> By “+ Docker” do you mean Docker SDK Harness or running a Spark in Docker? 
> For the former I believe it works fine. 
> 
> Could you share more details of what kind of error you are facing? 
> 
> > On 27 Oct 2020, at 21:10, Ramesh Mathikumar  wrote:
> > 
> > Hi Group -- Has anyone got this to work? For me it does not either in the 
> > IDE or in Colab. Whats the community take on this one?
> 
> 


Re: Spark Portable Runner + Docker

2020-10-28 Thread Alexey Romanenko
Hi Ramesh,

By “+ Docker” do you mean Docker SDK Harness or running a Spark in Docker? For 
the former I believe it works fine. 

Could you share more details of what kind of error you are facing? 

> On 27 Oct 2020, at 21:10, Ramesh Mathikumar  wrote:
> 
> Hi Group -- Has anyone got this to work? For me it does not either in the IDE 
> or in Colab. Whats the community take on this one?



Spark Portable Runner + Docker

2020-10-27 Thread Ramesh Mathikumar
Hi Group -- Has anyone got this to work? For me it does not either in the IDE 
or in Colab. Whats the community take on this one?


Re: Beam + Spark Portable Runner

2020-10-27 Thread Kyle Weaver
On the Spark runner webpage [1], we recommend running the job server
container with --net=host so all the job server's ports are exposed to the
host. However, host networking is not available if you're using Windows, so
you would have to expose individual ports instead. The job server uses
ports 8099, 8098, and 8097, so you can add them to your Docker command like
this:

docker run -p 8099:8099 -p 8098:8098 -p 8097:8097
apache/beam_spark_job_server:latest
--spark-master-url=spark://localhost:7077

[1] https://beam.apache.org/documentation/runners/spark/

On Sat, Oct 24, 2020 at 4:12 PM Ramesh Mathikumar 
wrote:

> Its very strange.
>
> I run the same code in Colab and it works fine. Just to ensure its not
> using a local runner - i closed the Flink Connector and then it came back
> as connection refused. When I ran it again bit with now Flink running again
> - worked as a doddle.
>
>
>
> On 2020/10/24 22:56:18, Ankur Goenka  wrote:
> > Can you try running
> > java -jar
> >
> C:\\Users\\rekharamesh/.apache_beam/cache/jars\\beam-runners-flink-1.8-job-server-2.24.0.jar
> > --flink-master http://localhost:8081 --artifacts-dir
> >
> C:\\Users\\REKHAR~1\\AppData\\Local\\Temp\\beam-temp4r_emy7q\\artifactskmzxyxkl
> > --job-port 57115 --artifact-port 0 --expansion-port 0
> >
> > to see why the job server is failing.
> >
> > On Sat, Oct 24, 2020 at 3:52 PM Ramesh Mathikumar <
> meetr...@googlemail.com>
> > wrote:
> >
> > > Hi Ankur,
> > >
> > > Thanks for the prompt response. I suspected a similar issue to to
> validate
> > > that I ran a local cluster of Flink. My Parameters are are follows.
> > >
> > > options = PipelineOptions([
> > > "--runner=FlinkRunner",
> > > "--flink_version=1.8",
> > > "--flink_master=localhost:8081",
> > > "--environment_type=LOOPBACK"
> > > ])
> > >
> > > And when I run it - it does not even reach the cluster - instead it
> bombs
> > > with a following message.
> > >
> > >
> > > WARNING:root:Make sure that locally built Python SDK docker image has
> > > Python 3.6 interpreter.
> > > ERROR:apache_beam.utils.subprocess_server:Starting job service with
> > > ['java', '-jar',
> > >
> 'C:\\Users\\rekharamesh/.apache_beam/cache/jars\\beam-runners-flink-1.8-job-se
> > > rver-2.24.0.jar', '--flink-master', 'http://localhost:8081',
> > > '--artifacts-dir',
> > >
> 'C:\\Users\\REKHAR~1\\AppData\\Local\\Temp\\beam-temp4r_emy7q\\artifactskmzxyxkl',
> > > '--job-port', '57115', '--artifact-port', '0', '--expansion-port', '0']
> > > ERROR:apache_beam.utils.subprocess_server:Error bringing up service
> > > Traceback (most recent call last):
> > >   File
> > >
> "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\utils\subprocess_server.py",
> > > line 88, in start
> > > 'Service failed to start up with error %s' % self._process.poll())
> > > RuntimeError: Service failed to start up with error 1
> > > Traceback (most recent call last):
> > >   File "SaiStudy - Apache-Beam-Spark.py", line 34, in 
> > > | 'Write results' >> beam.io.WriteToText(outputs_prefix)
> > >   File
> > >
> "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\pipeline.py",
> > > line 555, in __exit__
> > > self.result = self.run()
> > >   File
> > >
> "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\pipeline.py",
> > > line 534, in run
> > > return self.runner.run_pipeline(self, self._options)
> > >   File
> > >
> "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\runners\portability\flink_runner.py",
> > > line 49, in run_pipeline
> > >
> > > return super(FlinkRunner, self).run_pipeline(pipeline, options)
> > >   File
> > >
> "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\runners\portability\portable_runner.py",
> > > line 388, in run_pipe
> > > line
> > > job_service_handle = self.create_job_service(options)
> > >   File
> > >
> "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\runners\portability\portable_runner.py",
> > > line 304, in create_j
> > > ob_service
&

  1   2   3   4   >