Survey/Pulse: Beam on Flink/Spark/Samza/etc
Curious who all is using Beam on runners other than dataflow. Please respond either on-list or to me directly ... Mostly just curious the extent of whether Beam is fulfilling its promise of runner agnosticism. Getting good data on that is hard, so anecdotes would be very welcomed!
[Question] Apache Beam Spark Runner Support - Spark 3.5, Scala 2.13 Environment
Hello, Does the latest Apache Beam version support Spark 3.5, Scala 2.13 environment for spark runner ? Based on initial analysis - Spark runner supports Spark 3.5.0 version *Reference*: https://github.com/apache/beam/blob/f660f49b6cf5d241285b156ccc4a5f52d82cfa63/runners/spark/3/build.gradle#L36 Would like to know compatibility with Scala version 2.13 Based on the below references, support is extended only to Scala version 2.12 (Libraries are compiled with Scala version 2.12) https://github.com/apache/beam/blob/0dc330e6d53cce51b13bcb652e80859c1b3a5975/runners/spark/3/build.gradle#L24 https://mvnrepository.com/artifact/org.apache.beam/beam-runners-spark-3/2.54.0 Any insight/future roadmap plans on Scala 2.13 support will be very helpful Thanks, Sri Ganesh V
Beam on spark yarn could only start one worker
Hello, We are using beam, which is written by python. And we want to deploy to spark on yarn cluster . But we found that it could only start one spark worker. Also on this worker, beam docker is started. - The beam pipeline option printed : {key:"beam:option:spark_master:v1" value:{string_value:"local[4]"}} {key:"beam:option:direct_runner_use_stacked_bundle:v1" value:{bool_value:true}} Our steps: Firstly, we generate jar file with python file. python3.7 -m beam_demo \ --spark_version=3 \ --runner=SparkRunner \ --spark_job_server_jar=/home/hadoop/beam-runners-spark-3-job-server-2.52.0.jar \ --sdk_container_image=apache/beam_python3.7_sdk_boto3:2.45.0 \ --sdk_harness_container_image=apache/beam_python3.7_sdk_boto3:2.45.0 \ --worker_harness_container_image=apache/beam_python3.7_sdk_boto3:2.45.0 \ --environment_type=DOCKER --environment_config=apache/beam_python3.7_sdk_boto3:2.45.0 \ --sdk_location=container \ --num_workers=2 \ --output_executable_path=jars/beam_demo.jar Secondly, we submit it to spark: spark-submit --master yarn --deploy-mode cluster jars/beam_demo.jar Our question is: 1. We configured sparkRunner. Why the runner is direct_runner_use_stacked_bundle? 2. We submitted job to spark on yarn. Why the printed spark_master is local[4]? 3. Could python beam on Spark yarn cluster start multi workers? Could you please help to check those questions? Maybe we have something missing. Thank you very much for your help.
SparkRunner / PortableRunner Spark config besides Spark Master
Hi, I'm struggling to figure out the best way to make Python Beam jobs execute on a Spark cluster running on Kubernetes. Unfortunately, the available documentation is incomplete and confusing at best. The most flexible way I found was to compile a JAR from my Python job and submit that via spark-submit. Unfortunately, this seems to be extremely buggy and I cannot get it to feed logs from the SDK containers back to the Spark executors back to the driver. See: https://github.com/apache/beam/issues/29683 The other way would be to use a Beam job server, but here I cannot find a sensible way to set any Spark config options besides the master URL. I have a spark-defaults.conf with vital configuration, which needs to be passed to the job. I see two ways forward here: 1) I could let users run the job server locally in a Docker container. This way they could potentially mount their spark-defaults.conf somewhere, but I don't really see where (pointers here?). They would also need to mount their Kubernetes access credentials somehow, otherwise the job server cannot access the cluster. 2) I could run the Job server in the Kubernetes cluster, which would resolve the Kubernetes credential issue but not the Spark config issue. Though, even if that were solved, I would now force all users to use the same Spark config (not ideal). Is there a better way? From what I can see, the compiled JAR is the only viable option, but the log issue is a deal breaker. Thanks Janek smime.p7s Description: S/MIME Cryptographic Signature
Re: [Question] Apache Beam Spark Runner Support - Spark 3.5 Environment
I already added Spark 3.5.0 version to Beam Spark version tests [1] and I didn’t notice any regression. The next Beam release (2.53.0) should be available in a couple on months, depending on release preparation process. — Alexey [1] https://github.com/apache/beam/pull/29327 > On 9 Nov 2023, at 06:37, Giridhar Addepalli wrote: > > Thank you Alexey for sharing the details. > > Can you please let us know if you are planning to add Spark 3.5.0 > compatibility test as part of Beam 2.53.0 or not. > If so, approximately what is the timeline we are looking at for Beam 2.53.0 > release. > https://github.com/apache/beam/milestone/17 > > Thanks, > Giridhar. > > On Tue, Nov 7, 2023 at 6:24 PM Alexey Romanenko <mailto:aromanenko@gmail.com>> wrote: >> Hi Giridhar, >> >>> On 4 Nov 2023, at 08:04, Giridhar Addepalli >> <mailto:giridhar1...@gmail.com>> wrote: >>> >>> Thank you Alexey for the response. >>> >>> We are using Beam 2.41.0 with Spark 3.3.0 cluster. >>> We did not run into any issues. >>> is it because in Beam 2.41.0, compatibility tests were run against spark >>> 3.3.0 ? >>> https://github.com/apache/beam/blob/release-2.41.0/runners/spark/3/build.gradle >> >> Correct. >> There are some incompatibilities between Spark 3.1/3.2/3.3 versions and we >> fixed this for Spark runner in Beam 2.41 to make it possible to compile and >> run with different Spark versions. That was a goal of these compatibility >> tests. >> >> <22157.png> >> Fixes #22156: Fix Spark3 runner to compile against Spark 3.2/3.3 by mosche · >> Pull Request #22157 · apache/beam >> github.com >> <https://github.com/apache/beam/pull/22157>Fixes #22156: Fix Spark3 runner >> to compile against Spark 3.2/3.3 by mosche · Pull Request #22157 · >> apache/beam <https://github.com/apache/beam/pull/22157> >> github.com <https://github.com/apache/beam/pull/22157> >> >>> If so, since compatibility tests were not run against Spark 3.5.0 even in >>> latest release of Beam 2.52.0, is it not advised to use Beam 2.52.0 with >>> Spark 3.5.0 cluster ? >> >> I’d say, for now it's up to user to test and run it since it was not tested >> on Beam CI. >> I’m going to add this version for future testing. >> >> — >> Alexey >> >>> >>> Thanks, >>> Giridhar. >>> >>> On 2023/11/03 13:05:45 Alexey Romanenko wrote: >>> > AFAICT, the latest tested (compatibility tests) version for now is 3.4.1 >>> > [1] We may try to add 3.5.x version there. >>> > >>> > I believe that ValidateRunners tests are run only against default Spark >>> > 3.2.2 version. >>> > >>> > — >>> > Alexey >>> > >>> > [1] >>> > https://github.com/apache/beam/blob/2aaf09c0eb6928390d861ba228447338b8ca92d3/runners/spark/3/build.gradle#L36 >>> > >>> > >>> > > On 3 Nov 2023, at 05:06, Sri Ganesh Venkataraman >> > > <mailto:sr...@gmail.com>> wrote: >>> > > >>> > > Does Apache Beam version (2.41.0) or latest (2.51.0) support Spark 3.5 >>> > > environment for spark runner ? >>> > > >>> > > Apache Beam - Spark Runner Documentation states - >>> > > The Spark runner currently supports Spark’s 3.2.x branch >>> > > >>> > > Thanks >>> > > Sri Ganesh V >>> > >>> > >>
RE: Re: [Question] Apache Beam Spark Runner Support - Spark 3.5 Environment
Thank you Alexey for the response. We are using Beam 2.41.0 with Spark 3.3.0 cluster. We did not run into any issues. is it because in Beam 2.41.0, compatibility tests were run against spark 3.3.0 ? https://github.com/apache/beam/blob/release-2.41.0/runners/spark/3/build.gradle If so, since compatibility tests were not run against Spark 3.5.0 even in latest release of Beam 2.52.0, is it not advised to use Beam 2.52.0 with Spark 3.5.0 cluster ? Thanks, Giridhar. On 2023/11/03 13:05:45 Alexey Romanenko wrote: > AFAICT, the latest tested (compatibility tests) version for now is 3.4.1 [1] We may try to add 3.5.x version there. > > I believe that ValidateRunners tests are run only against default Spark 3.2.2 version. > > — > Alexey > > [1] https://github.com/apache/beam/blob/2aaf09c0eb6928390d861ba228447338b8ca92d3/runners/spark/3/build.gradle#L36 > > > > On 3 Nov 2023, at 05:06, Sri Ganesh Venkataraman wrote: > > > > Does Apache Beam version (2.41.0) or latest (2.51.0) support Spark 3.5 environment for spark runner ? > > > > Apache Beam - Spark Runner Documentation states - > > The Spark runner currently supports Spark’s 3.2.x branch > > > > Thanks > > Sri Ganesh V > >
Re: [Question] Apache Beam Spark Runner Support - Spark 3.5 Environment
AFAICT, the latest tested (compatibility tests) version for now is 3.4.1 [1] We may try to add 3.5.x version there. I believe that ValidateRunners tests are run only against default Spark 3.2.2 version. — Alexey [1] https://github.com/apache/beam/blob/2aaf09c0eb6928390d861ba228447338b8ca92d3/runners/spark/3/build.gradle#L36 > On 3 Nov 2023, at 05:06, Sri Ganesh Venkataraman > wrote: > > Does Apache Beam version (2.41.0) or latest (2.51.0) support Spark 3.5 > environment for spark runner ? > > Apache Beam - Spark Runner Documentation states - > The Spark runner currently supports Spark’s 3.2.x branch > > Thanks > Sri Ganesh V
[Question] Apache Beam Spark Runner Support - Spark 3.5 Environment
Does Apache Beam version (2.41.0) or latest (2.51.0) support Spark 3.5 environment for spark runner ? Apache Beam - Spark Runner Documentation states - The Spark runner currently supports Spark’s 3.2.x branch Thanks Sri Ganesh V
Re: Passing "conf" arguments using a portable runner in Java (spark job runner)
Hi Jon, sorry for the late replay. A while ago I was struggling with as well. Unfortunately, there’s no direct way to do this per pipeline. However, you can set default arguments by passing them to the job service container using the environment variable _JAVA_OPTIONS. I hope this still helps! Cheers, Moritz On 30.06.23, 20:29, "Jon Molle via user" wrote: Hi, I'm having trouble trying to figure out where to pass "conf" spark-submit arguments to a spark job service. I don't particularly care at this point whether the job service uses the same set of args for all jobs passed Hi, I'm having trouble trying to figure out where to pass "conf" spark-submit arguments to a spark job service. I don't particularly care at this point whether the job service uses the same set of args for all jobs passed to the service, I'm just having trouble finding where I can send these args where they will get picked up by the service. For reference, I'm using the portable pipeline options (IE: the portable runner, job endpoint, and DOCKER environment type). Has anyone tried this, specifically in Java? I'm actually writing in Kotlin, but all the examples of the portable runner are in Python, which is significantly different from the Java design. Thanks! As a recipient of an email from the Talend Group, your personal data will be processed by our systems. Please see our Privacy Notice <https://www.talend.com/privacy-policy/> for more information about our collection and use of your personal information, our security practices, and your data protection rights, including any rights you may have to object to automated-decision making or profiling we use to analyze support or marketing related communications. To manage or discontinue promotional communications, use the communication preferences portal<https://info.talend.com/emailpreferencesen.html>. To exercise your data protection rights, use the privacy request form<https://talend.my.onetrust.com/webform/ef906c5a-de41-4ea0-ba73-96c079cdd15a/b191c71d-f3cb-4a42-9815-0c3ca021704cl>. Contact us here <https://www.talend.com/contact/> or by mail to either of our co-headquarters: Talend, Inc.: 400 South El Camino Real, Ste 1400, San Mateo, CA 94402; Talend SAS: 5/7 rue Salomon De Rothschild, 92150 Suresnes, France
Re: Does beam spark runner support yarn-cluster mode
Yeah, I think you could check this document out. https://beam.apache.org/documentation/runners/spark/#running-on-dataproc-cluster-yarn-backed Spark runner shall support running on yarn-cluster mode. On Tue, Jul 11, 2023 at 7:58 PM Jeff Zhang wrote: > Hi all, > > I didn't find much material about spark runner, just wondering > whether beam spark runner support yarn-cluster mode. Thanks. > > > -- > Best Regards > > Jeff Zhang >
Does beam spark runner support yarn-cluster mode
Hi all, I didn't find much material about spark runner, just wondering whether beam spark runner support yarn-cluster mode. Thanks. -- Best Regards Jeff Zhang
Passing "conf" arguments using a portable runner in Java (spark job runner)
Hi, I'm having trouble trying to figure out where to pass "conf" spark-submit arguments to a spark job service. I don't particularly care at this point whether the job service uses the same set of args for all jobs passed to the service, I'm just having trouble finding where I can send these args where they will get picked up by the service. For reference, I'm using the portable pipeline options (IE: the portable runner, job endpoint, and DOCKER environment type). Has anyone tried this, specifically in Java? I'm actually writing in Kotlin, but all the examples of the portable runner are in Python, which is significantly different from the Java design. Thanks!
[Question]dataproc(gce)+spark+beam(python) seems only use one worker
Hi Beam Community, I follow this doc(https://beam.apache.org/documentation/runners/spark/#running-on-dataproc-cluster-yarn-backed). Try to use dataproc to run my beam pipline. I ceate a dataproc cluster(1 master 2 worker),I could only see one worker is working(use `docker ps` to check worker node). I also increase to use 4 worker got same result. Is there some paramters that I could use, or I do something wrong? I also try to use dataproc(gce)+flink+beam(python) use option `parallelism` seems not working either. This is my code. ``` import apache_beam as beam import logging from apache_beam.options.pipeline_options import PipelineOptions options = PipelineOptions([ '--runner=SparkRunner', '--output_executable_path=job.jar', '--spark_version=3', ]) class SplitWords(beam.DoFn): def __init__(self, delimiter=','): self.delimiter = delimiter def process(self, text): import time time.sleep(10) for word in text.split(self.delimiter): yield word logging.getLogger().setLevel(logging.INFO) data = ['Strawberry,Carrot,Eggplant','Tomato,Potato']*800 with beam.Pipeline(options=options) as pipeline: plants = ( pipeline | 'Gardening plants' >> beam.Create(data) | 'Split words' >> beam.ParDo(SplitWords(',')) | beam.Map(print)) ``` Thanks Yuwen Zhao
[Announcement] Planned removal of Spark 2 runner support in 2.46.0
Dear All, The runner for Spark 2 was deprecated quite a while back in August 2022 with the release of Beam 2.41.0 [1]. We’re planning to move ahead with this and finally remove support for Spark 2 (beam-runners-spark) to only maintain support for Spark 3 (beam-runners-spark-3) going forward. Note, Spark 2.4.8 is the latest release of Spark 2. It was released in May 2021 and no further releases of 2.4.x should be expected, even for bug fixes [2]. If you are still relying on the Spark 2 runner and there’s good reasons not to migrate to Spark 3, please reach out and we might reconsider this. Kind regards, Moritz [1] https://github.com/apache/beam/blob/master/CHANGES.md#2410---2022-08-23 [2] https://spark.apache.org/versioning-policy.html As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
Re: Metrics in Beam+Spark
Yes, that’s exactly what I was referring to. A - hopefully - easy way to avoid this problem might be to change the Spark configuration to use the following: --conf "spark.metrics.conf.driver.sink.jmx.class"="com.salesforce.einstein.data.platform.connectors.JmxSink" --conf "spark.metrics.conf.executor.sink.jmx.class"="org.apache.spark.metrics.sink.JmxSink" Beam metrics are only exposed on the driver, so it’s enough to use your custom Sink on the driver. Those classpath issues are limited to executor nodes if I remember right (and I’d be surprised to see the same on the driver). Though, I haven’t tested … By distributing such code by other means, I meant adding the relevant classes to the system classpath of the executor ($SPARK_HOME/jars). How to do that depends heavily on your infrastructure. For example, on Kubernetes it could be a base image that already contains the custom sink; on EMR it could be a bootstrap action that copies the sink from S3 … Of course, that’s not great because it affects everything running on the cluster with all the potential problems this creates. / Moritz On 16.07.22, 07:54, "Yushu Yao" wrote: Hi Moritz, When dealing with custom sinks, it’s fairly common to run into classpath issues :/ The metric system is loaded when an executor starts up, by then the application classpath isn’t available yet. Typically, that means distributing ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Moritz, When dealing with custom sinks, it’s fairly common to run into classpath issues :/ The metric system is loaded when an executor starts up, by then the application classpath isn’t available yet. Typically, that means distributing such code by other means. Could you also elaborate a bit about "distributing such code by other means"? I wrapped JmxSink (just like CsvSink for beam). And now gets the following error. ERROR MetricsSystem: Sink class com.salesforce.einstein.data.platform.connectors.JmxSink cannot be instantiated Caused by: java.lang.ClassNotFoundException: com.salesforce.einstein.data.platform.connectors.JmxSink I guess this is the classpath issue you were referring to above. Any hints on how to fix it will be greatly appreciated. Thanks! -Yushu /Moritz On 14.07.22, 21:26, "Yushu Yao" mailto:yao.yu...@gmail.com>> wrote: Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() every half second and copy all the metrics into the spark metrics. Wondering if there is a better way? Thanks! -Yushu MetricQueryResults metrics = pipelineResult .metrics() .queryMetrics( MetricsFilter.builder() .addNameFilter(MetricNameFilter.inNamespace(namespace)) .build()); for (MetricResult cc : metrics.getGauges()) { LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); com.codahale.metrics.Gauge gauge = new SimpleBeamGauge(cc.getAttempted().getValue()); try { String name = metricName(cc.getKey(), addStepName, addNamespace); if (registry.getNames().contains(name)) { LOGGER.info("Removing metric {}", name); registry.remove(name); } registry.register(name, gauge); } catch (IllegalArgumentException e) { LOGGER.warn("Duplicated metrics found. Try turning on addStepName=true.", e); } } As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
Re: Metrics in Beam+Spark
Hi Moritz, > > When dealing with custom sinks, it’s fairly common to run into classpath > issues :/ The metric system is loaded when an executor starts up, by then > the application classpath isn’t available yet. Typically, that means > distributing such code by other means. > > > Could you also elaborate a bit about "distributing such code by other means"? I wrapped JmxSink (just like CsvSink for beam). And now gets the following error. *ERROR MetricsSystem: Sink class com.salesforce.einstein.data.platform.connectors.JmxSink cannot be instantiated* * Caused by: java.lang.ClassNotFoundException: com.salesforce.einstein.data.platform.connectors.JmxSink* I guess this is the classpath issue you were referring to above. Any hints on how to fix it will be greatly appreciated. Thanks! -Yushu /Moritz > > > > > > On 14.07.22, 21:26, "Yushu Yao" wrote: > > > > Hi Team, Does anyone have a working example of a beam job running on top > of spark? So that I can use the beam metric syntax and the metrics will be > shipped out via spark's infra? The only thing I achieved is to be able to > queryMetrics() > > ZjQcmQRYFpfptBannerStart > > *This Message Is From an External Sender * > > This message came from outside your organization. > > Exercise caution when opening attachments or clicking any links. > > ZjQcmQRYFpfptBannerEnd > > Hi Team, > > Does anyone have a working example of a beam job running on top of spark? > So that I can use the beam metric syntax and the metrics will be shipped > out via spark's infra? > > > > The only thing I achieved is to be able to queryMetrics() every half > second and copy all the metrics into the spark metrics. > > Wondering if there is a better way? > > > > Thanks! > > -Yushu > > > > MetricQueryResults metrics = > pipelineResult > .metrics() > .queryMetrics( > MetricsFilter.*builder*() > .addNameFilter(MetricNameFilter.*inNamespace*(namespace)) > .build()); > > for (MetricResult cc : metrics.getGauges()) { > *LOGGER*.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); > com.codahale.metrics.Gauge gauge = > new SimpleBeamGauge(cc.getAttempted().getValue()); > try { > String name = metricName(cc.getKey(), addStepName, addNamespace); > if (registry.getNames().contains(name)) { > *LOGGER*.info("Removing metric {}", name); > registry.remove(name); > } > registry.register(name, gauge); > } catch (IllegalArgumentException e) { > *LOGGER*.warn("Duplicated metrics found. Try turning on > addStepName=true.", e); > } > } > > *As a recipient of an email from Talend, your contact personal data will > be on our systems. Please see our privacy notice. > <https://www.talend.com/privacy/>* > > >
Re: Metrics in Beam+Spark
Yes, you would have to wrap the Spark JmxSink the same way it’s done for the CSV or Graphite one, see [1]. This is necessary to expose Gauges provided by Beam to the sinks. However, if you are on Spark 3 it’s possible to use the new plugin framework of Spark (see). That’s something I was planning to work on but haven’t found time yet. Using a driver plugin, respective gauges could be registered in Spark’s internal metrics registry (available from PluginContext [3]) without using custom sinks. Btw, all of this is happening on the driver. So actually, there’s no trouble to expect with the executor classpath. / Moritz [1] https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.java#L35 [2] https://spark.apache.org/docs/3.1.2/api/java/org/apache/spark/api/plugin/SparkPlugin.html [3] https://spark.apache.org/docs/3.1.2/api/java/org/apache/spark/api/plugin/PluginContext.html#metricRegistry— On 15.07.22, 07:47, "Yushu Yao" wrote: Thanks Mortiz! We are using jmx to ship the metrics out of spark. Most of the spark built-in driver and executor metrics are going out fine. Does this require us to make another sink? -Yushu On Thu, Jul 14, 2022 at 1:05 PM Moritz Mack < ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Thanks Mortiz! We are using jmx to ship the metrics out of spark. Most of the spark built-in driver and executor metrics are going out fine. Does this require us to make another sink? -Yushu On Thu, Jul 14, 2022 at 1:05 PM Moritz Mack mailto:mm...@talend.com>> wrote: Hi Yushu, Wondering, how did you configure your Spark metrics sink? And what version of Spark are you using? Key is to configure Spark to use one of the sinks provided by Beam, e.g.: "spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink" Currently there’s support for CSV and Graphite, but it’s simple to support others. The sinks typically wrap the corresponding Spark sinks and integrate with the Metrics registry of the Spark metric system. https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html<https://urldefense.com/v3/__https:/beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html__;!!CiXD_PY!Q0RNO7g4itIWf5gyr87zTB4bPHQ8BhgVB5n3lWkvl1MUyhI0H63K5OPMEabBlMykhd5cmyMwOBZcark$> When dealing with custom sinks, it’s fairly common to run into classpath issues :/ The metric system is loaded when an executor starts up, by then the application classpath isn’t available yet. Typically, that means distributing such code by other means. /Moritz On 14.07.22, 21:26, "Yushu Yao" mailto:yao.yu...@gmail.com>> wrote: Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() every half second and copy all the metrics into the spark metrics. Wondering if there is a better way? Thanks! -Yushu MetricQueryResults metrics = pipelineResult .metrics() .queryMetrics( MetricsFilter.builder() .addNameFilter(MetricNameFilter.inNamespace(namespace)) .build()); for (MetricResult cc : metrics.getGauges()) { LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); com.codahale.metrics.Gauge gauge = new SimpleBeamGauge(cc.getAttempted().getValue()); try { String name = metricName(cc.getKey(), addStepName, addNamespace); if (registry.getNames().contains(name)) { LOGGER.info("Removing metric {}", name); registry.remove(name); } registry.register(name, gauge); } catch (IllegalArgumentException e) { LOGGER.warn("Duplicated metrics found. Try turning on addStepName=true.", e); } } As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
Re: Metrics in Beam+Spark
Thanks Mortiz! We are using jmx to ship the metrics out of spark. Most of the spark built-in driver and executor metrics are going out fine. Does this require us to make another sink? -Yushu On Thu, Jul 14, 2022 at 1:05 PM Moritz Mack wrote: > Hi Yushu, > > > > Wondering, how did you configure your Spark metrics sink? And what version > of Spark are you using? > > > > Key is to configure Spark to use one of the sinks provided by Beam, e.g.: > > > "spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink" > > > > Currently there’s support for CSV and Graphite, but it’s simple to support > others. The sinks typically wrap the corresponding Spark sinks and > integrate with the Metrics registry of the Spark metric system. > > > > > https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html > > > > When dealing with custom sinks, it’s fairly common to run into classpath > issues :/ The metric system is loaded when an executor starts up, by then > the application classpath isn’t available yet. Typically, that means > distributing such code by other means. > > > > /Moritz > > > > > > On 14.07.22, 21:26, "Yushu Yao" wrote: > > > > Hi Team, Does anyone have a working example of a beam job running on top > of spark? So that I can use the beam metric syntax and the metrics will be > shipped out via spark's infra? The only thing I achieved is to be able to > queryMetrics() > > ZjQcmQRYFpfptBannerStart > > *This Message Is From an External Sender * > > This message came from outside your organization. > > Exercise caution when opening attachments or clicking any links. > > ZjQcmQRYFpfptBannerEnd > > Hi Team, > > Does anyone have a working example of a beam job running on top of spark? > So that I can use the beam metric syntax and the metrics will be shipped > out via spark's infra? > > > > The only thing I achieved is to be able to queryMetrics() every half > second and copy all the metrics into the spark metrics. > > Wondering if there is a better way? > > > > Thanks! > > -Yushu > > > > MetricQueryResults metrics = > pipelineResult > .metrics() > .queryMetrics( > MetricsFilter.*builder*() > .addNameFilter(MetricNameFilter.*inNamespace*(namespace)) > .build()); > > for (MetricResult cc : metrics.getGauges()) { > *LOGGER*.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); > com.codahale.metrics.Gauge gauge = > new SimpleBeamGauge(cc.getAttempted().getValue()); > try { > String name = metricName(cc.getKey(), addStepName, addNamespace); > if (registry.getNames().contains(name)) { > *LOGGER*.info("Removing metric {}", name); > registry.remove(name); > } > registry.register(name, gauge); > } catch (IllegalArgumentException e) { > *LOGGER*.warn("Duplicated metrics found. Try turning on > addStepName=true.", e); > } > } > > *As a recipient of an email from Talend, your contact personal data will > be on our systems. Please see our privacy notice. > <https://www.talend.com/privacy/>* > > >
Re: Metrics in Beam+Spark
Hi Yushu, Wondering, how did you configure your Spark metrics sink? And what version of Spark are you using? Key is to configure Spark to use one of the sinks provided by Beam, e.g.: "spark.metrics.conf.*.sink.csv.class"="org.apache.beam.runners.spark.metrics.sink.CsvSink" Currently there’s support for CSV and Graphite, but it’s simple to support others. The sinks typically wrap the corresponding Spark sinks and integrate with the Metrics registry of the Spark metric system. https://beam.apache.org/releases/javadoc/2.40.0/org/apache/beam/runners/spark/metrics/sink/package-summary.html When dealing with custom sinks, it’s fairly common to run into classpath issues :/ The metric system is loaded when an executor starts up, by then the application classpath isn’t available yet. Typically, that means distributing such code by other means. /Moritz On 14.07.22, 21:26, "Yushu Yao" wrote: Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() every half second and copy all the metrics into the spark metrics. Wondering if there is a better way? Thanks! -Yushu MetricQueryResults metrics = pipelineResult .metrics() .queryMetrics( MetricsFilter.builder() .addNameFilter(MetricNameFilter.inNamespace(namespace)) .build()); for (MetricResult cc : metrics.getGauges()) { LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); com.codahale.metrics.Gauge gauge = new SimpleBeamGauge(cc.getAttempted().getValue()); try { String name = metricName(cc.getKey(), addStepName, addNamespace); if (registry.getNames().contains(name)) { LOGGER.info("Removing metric {}", name); registry.remove(name); } registry.register(name, gauge); } catch (IllegalArgumentException e) { LOGGER.warn("Duplicated metrics found. Try turning on addStepName=true.", e); } } As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
Metrics in Beam+Spark
Hi Team, Does anyone have a working example of a beam job running on top of spark? So that I can use the beam metric syntax and the metrics will be shipped out via spark's infra? The only thing I achieved is to be able to queryMetrics() every half second and copy all the metrics into the spark metrics. Wondering if there is a better way? Thanks! -Yushu MetricQueryResults metrics = pipelineResult .metrics() .queryMetrics( MetricsFilter.builder() .addNameFilter(MetricNameFilter.inNamespace(namespace)) .build()); for (MetricResult cc : metrics.getGauges()) { LOGGER.info("Adding Gauge: {} : {}", cc.getName(), cc.getAttempted()); com.codahale.metrics.Gauge gauge = new SimpleBeamGauge(cc.getAttempted().getValue()); try { String name = metricName(cc.getKey(), addStepName, addNamespace); if (registry.getNames().contains(name)) { LOGGER.info("Removing metric {}", name); registry.remove(name); } registry.register(name, gauge); } catch (IllegalArgumentException e) { LOGGER.warn("Duplicated metrics found. Try turning on addStepName=true.", e); } }
Re: RDD (Spark dataframe) into a PCollection?
Hi Yushu, Have a look at org.apache.beam.runners.spark.translation.EvaluationContext in the Spark runner. It maintains that mapping between PCollections and RDDs (wrapped in the BoundedDataset helper). As Reuven just pointed out, values are timestamped (and windowed) in Beam, therefore BoundedDataset expects a JavaRDD>. The idea is to map your external RDD to a new PCollection (PCollection.createPrimitiveOutputInternal) in the EvaluationContext (and vice versa). You can then apply Beam transforms to that PCollection (and with that effectively to the mapped RDD) as you are used to. Obviously, there’s a few steps necessary as EvaluationContext isn’t easily accessible from the outside. Just having a quick look, creating a RddSource for Spark RDDs seems also not too bad. That would allow you to do something like this: pipeline.apply(Read.from(new RddSource<>(javaRdd.rdd(), coder))); Though I haven’t done much testing beyond a quick experiment. One notable disadvantage of that approach Is that all RDD partition data must be broadcasted to all workers to then pick the right partition. This should mostly be fine, but some types of partitions carry data as well … https://gist.github.com/mosche/5c1ef8ba281a9a08df1ec67fac700d03 /Moritz On 24.05.22, 16:46, "Yushu Yao" wrote: Looks like it's a valid use case. Wondering anyone can give some high level guidelines on how to implement this? I can give it a try. -Yushu On Tue, May 24, 2022 at 2:42 AM Jan Lukavský wrote: ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Looks like it's a valid use case. Wondering anyone can give some high level guidelines on how to implement this? I can give it a try. -Yushu On Tue, May 24, 2022 at 2:42 AM Jan Lukavský mailto:je...@seznam.cz>> wrote: +dev@beam<mailto:d...@beam.apache.org> On 5/24/22 11:40, Jan Lukavský wrote: Hi, I think this feature is valid. Every runner for which Beam is not a 'native' SDK uses some form of translation context, which maps PCollection to internal representation of the particular SDK of the runner (RDD in this case). It should be possible to "import" an RDD into the specific runner via something like SparkRunner runner = ; PCollection<...> pCollection = runner.importRDD(rdd); and similarly RDD<...> rdd = runner.exportRDD(pCollection); Yes, apparently this would be runner specific, but that is the point, actually. This would enable using features and libraries, that Beam does not have, or micro-optimize some particular step using runner-specific features, that we don't have in Beam. We actually had this feature (at least in a prototype) many years ago when Euphoria was a separate project. Jan On 5/23/22 20:58, Alexey Romanenko wrote: On 23 May 2022, at 20:40, Brian Hulette mailto:bhule...@google.com>> wrote: Yeah I'm not sure of any simple way to do this. I wonder if it's worth considering building some Spark runner-specific feature around this, or at least packaging up Robert's proposed solution? I’m not sure that a runner specific feature is a good way to do this since the other runners won’t be able to support it or I’m missing something? There could be other interesting integrations in this space too, e.g. using Spark RDDs as a cache for Interactive Beam. Another option could be to add something like SparkIO (or FlinkIO/whatever) to read/write data from/to Spark data structures for such cases (Spark schema to Beam schema convention also could be supported). And dreaming a bit more, for those who need to have a mixed pipeline (e.g. Spark + Beam) such connectors could support the push-downs of pure Spark pipelines and then use the result downstream in Beam. — Alexey Brian On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw mailto:rober...@google.com>> wrote: The easiest way to do this would be to write the RDD somewhere then read it from Beam. On Mon, May 23, 2022 at 9:39 AM Yushu Yao mailto:yao.yu...@gmail.com>> wrote: > > Hi Folks, > > I know this is not the optimal way to use beam :-) But assume I only use the > spark runner. > > I have a spark library (very complex) that emits a spark dataframe (or RDD). > I also have an existing complex beam pipeline that can do post processing on > the data inside the dataframe. > > However, the beam part needs a pcollection to start with. The question is, > how can I convert a spark RDD into a pcollection? > > Thanks > -Yushu > As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
Re: RDD (Spark dataframe) into a PCollection?
Yes, I suppose it might be more complex than the code snippet, that was just to demonstrate the idea. Also the "exportRDD" would probably return WindowedValue instead of plain T. On 5/24/22 17:23, Reuven Lax wrote: Something like this seems reasonable. Beam PCollections also have a timestamp associated with every element, so the importRDD function probably needs a way to specify the timestamp (could be an attribute name for dataframes or a timestamp extraction function for regular RDDs). On Tue, May 24, 2022 at 2:40 AM Jan Lukavský wrote: Hi, I think this feature is valid. Every runner for which Beam is not a 'native' SDK uses some form of translation context, which maps PCollection to internal representation of the particular SDK of the runner (RDD in this case). It should be possible to "import" an RDD into the specific runner via something like SparkRunner runner = ; PCollection<...> pCollection = runner.importRDD(rdd); and similarly RDD<...> rdd = runner.exportRDD(pCollection); Yes, apparently this would be runner specific, but that is the point, actually. This would enable using features and libraries, that Beam does not have, or micro-optimize some particular step using runner-specific features, that we don't have in Beam. We actually had this feature (at least in a prototype) many years ago when Euphoria was a separate project. Jan On 5/23/22 20:58, Alexey Romanenko wrote: On 23 May 2022, at 20:40, Brian Hulette wrote: Yeah I'm not sure of any simple way to do this. I wonder if it's worth considering building some Spark runner-specific feature around this, or at least packaging up Robert's proposed solution? I’m not sure that a runner specific feature is a good way to do this since the other runners won’t be able to support it or I’m missing something? There could be other interesting integrations in this space too, e.g. using Spark RDDs as a cache for Interactive Beam. Another option could be to add something like SparkIO (or FlinkIO/whatever) to read/write data from/to Spark data structures for such cases (Spark schema to Beam schema convention also could be supported). And dreaming a bit more, for those who need to have a mixed pipeline (e.g. Spark + Beam) such connectors could support the push-downs of pure Spark pipelines and then use the result downstream in Beam. — Alexey Brian On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw wrote: The easiest way to do this would be to write the RDD somewhere then read it from Beam. On Mon, May 23, 2022 at 9:39 AM Yushu Yao wrote: > > Hi Folks, > > I know this is not the optimal way to use beam :-) But assume I only use the spark runner. > > I have a spark library (very complex) that emits a spark dataframe (or RDD). > I also have an existing complex beam pipeline that can do post processing on the data inside the dataframe. > > However, the beam part needs a pcollection to start with. The question is, how can I convert a spark RDD into a pcollection? > > Thanks > -Yushu >
Re: RDD (Spark dataframe) into a PCollection?
Looks like it's a valid use case. Wondering anyone can give some high level guidelines on how to implement this? I can give it a try. -Yushu On Tue, May 24, 2022 at 2:42 AM Jan Lukavský wrote: > +dev@beam > On 5/24/22 11:40, Jan Lukavský wrote: > > Hi, > I think this feature is valid. Every runner for which Beam is not a > 'native' SDK uses some form of translation context, which maps PCollection > to internal representation of the particular SDK of the runner (RDD in this > case). It should be possible to "import" an RDD into the specific runner > via something like > > SparkRunner runner = ; > PCollection<...> pCollection = runner.importRDD(rdd); > > and similarly > > RDD<...> rdd = runner.exportRDD(pCollection); > > Yes, apparently this would be runner specific, but that is the point, > actually. This would enable using features and libraries, that Beam does > not have, or micro-optimize some particular step using runner-specific > features, that we don't have in Beam. We actually had this feature (at > least in a prototype) many years ago when Euphoria was a separate project. > > Jan > On 5/23/22 20:58, Alexey Romanenko wrote: > > > > On 23 May 2022, at 20:40, Brian Hulette wrote: > > Yeah I'm not sure of any simple way to do this. I wonder if it's worth > considering building some Spark runner-specific feature around this, or at > least packaging up Robert's proposed solution? > > > I’m not sure that a runner specific feature is a good way to do this since > the other runners won’t be able to support it or I’m missing something? > > There could be other interesting integrations in this space too, e.g. > using Spark RDDs as a cache for Interactive Beam. > > > Another option could be to add something like SparkIO (or > FlinkIO/whatever) to read/write data from/to Spark data structures for such > cases (Spark schema to Beam schema convention also could be supported). And > dreaming a bit more, for those who need to have a mixed pipeline (e.g. > Spark + Beam) such connectors could support the push-downs of pure Spark > pipelines and then use the result downstream in Beam. > > — > Alexey > > > > Brian > > On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw > wrote: > >> The easiest way to do this would be to write the RDD somewhere then >> read it from Beam. >> >> On Mon, May 23, 2022 at 9:39 AM Yushu Yao wrote: >> > >> > Hi Folks, >> > >> > I know this is not the optimal way to use beam :-) But assume I only >> use the spark runner. >> > >> > I have a spark library (very complex) that emits a spark dataframe (or >> RDD). >> > I also have an existing complex beam pipeline that can do post >> processing on the data inside the dataframe. >> > >> > However, the beam part needs a pcollection to start with. The question >> is, how can I convert a spark RDD into a pcollection? >> > >> > Thanks >> > -Yushu >> > >> > >
Re: RDD (Spark dataframe) into a PCollection?
+dev@beam <mailto:d...@beam.apache.org> On 5/24/22 11:40, Jan Lukavský wrote: Hi, I think this feature is valid. Every runner for which Beam is not a 'native' SDK uses some form of translation context, which maps PCollection to internal representation of the particular SDK of the runner (RDD in this case). It should be possible to "import" an RDD into the specific runner via something like SparkRunner runner = ; PCollection<...> pCollection = runner.importRDD(rdd); and similarly RDD<...> rdd = runner.exportRDD(pCollection); Yes, apparently this would be runner specific, but that is the point, actually. This would enable using features and libraries, that Beam does not have, or micro-optimize some particular step using runner-specific features, that we don't have in Beam. We actually had this feature (at least in a prototype) many years ago when Euphoria was a separate project. Jan On 5/23/22 20:58, Alexey Romanenko wrote: On 23 May 2022, at 20:40, Brian Hulette wrote: Yeah I'm not sure of any simple way to do this. I wonder if it's worth considering building some Spark runner-specific feature around this, or at least packaging up Robert's proposed solution? I’m not sure that a runner specific feature is a good way to do this since the other runners won’t be able to support it or I’m missing something? There could be other interesting integrations in this space too, e.g. using Spark RDDs as a cache for Interactive Beam. Another option could be to add something like SparkIO (or FlinkIO/whatever) to read/write data from/to Spark data structures for such cases (Spark schema to Beam schema convention also could be supported). And dreaming a bit more, for those who need to have a mixed pipeline (e.g. Spark + Beam) such connectors could support the push-downs of pure Spark pipelines and then use the result downstream in Beam. — Alexey Brian On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw wrote: The easiest way to do this would be to write the RDD somewhere then read it from Beam. On Mon, May 23, 2022 at 9:39 AM Yushu Yao wrote: > > Hi Folks, > > I know this is not the optimal way to use beam :-) But assume I only use the spark runner. > > I have a spark library (very complex) that emits a spark dataframe (or RDD). > I also have an existing complex beam pipeline that can do post processing on the data inside the dataframe. > > However, the beam part needs a pcollection to start with. The question is, how can I convert a spark RDD into a pcollection? > > Thanks > -Yushu >
Re: RDD (Spark dataframe) into a PCollection?
Hi, I think this feature is valid. Every runner for which Beam is not a 'native' SDK uses some form of translation context, which maps PCollection to internal representation of the particular SDK of the runner (RDD in this case). It should be possible to "import" an RDD into the specific runner via something like SparkRunner runner = ; PCollection<...> pCollection = runner.importRDD(rdd); and similarly RDD<...> rdd = runner.exportRDD(pCollection); Yes, apparently this would be runner specific, but that is the point, actually. This would enable using features and libraries, that Beam does not have, or micro-optimize some particular step using runner-specific features, that we don't have in Beam. We actually had this feature (at least in a prototype) many years ago when Euphoria was a separate project. Jan On 5/23/22 20:58, Alexey Romanenko wrote: On 23 May 2022, at 20:40, Brian Hulette wrote: Yeah I'm not sure of any simple way to do this. I wonder if it's worth considering building some Spark runner-specific feature around this, or at least packaging up Robert's proposed solution? I’m not sure that a runner specific feature is a good way to do this since the other runners won’t be able to support it or I’m missing something? There could be other interesting integrations in this space too, e.g. using Spark RDDs as a cache for Interactive Beam. Another option could be to add something like SparkIO (or FlinkIO/whatever) to read/write data from/to Spark data structures for such cases (Spark schema to Beam schema convention also could be supported). And dreaming a bit more, for those who need to have a mixed pipeline (e.g. Spark + Beam) such connectors could support the push-downs of pure Spark pipelines and then use the result downstream in Beam. — Alexey Brian On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw wrote: The easiest way to do this would be to write the RDD somewhere then read it from Beam. On Mon, May 23, 2022 at 9:39 AM Yushu Yao wrote: > > Hi Folks, > > I know this is not the optimal way to use beam :-) But assume I only use the spark runner. > > I have a spark library (very complex) that emits a spark dataframe (or RDD). > I also have an existing complex beam pipeline that can do post processing on the data inside the dataframe. > > However, the beam part needs a pcollection to start with. The question is, how can I convert a spark RDD into a pcollection? > > Thanks > -Yushu >
Re: RDD (Spark dataframe) into a PCollection?
> On 23 May 2022, at 20:40, Brian Hulette wrote: > > Yeah I'm not sure of any simple way to do this. I wonder if it's worth > considering building some Spark runner-specific feature around this, or at > least packaging up Robert's proposed solution? I’m not sure that a runner specific feature is a good way to do this since the other runners won’t be able to support it or I’m missing something? > There could be other interesting integrations in this space too, e.g. using > Spark RDDs as a cache for Interactive Beam. Another option could be to add something like SparkIO (or FlinkIO/whatever) to read/write data from/to Spark data structures for such cases (Spark schema to Beam schema convention also could be supported). And dreaming a bit more, for those who need to have a mixed pipeline (e.g. Spark + Beam) such connectors could support the push-downs of pure Spark pipelines and then use the result downstream in Beam. — Alexey > > Brian > > On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw <mailto:rober...@google.com>> wrote: > The easiest way to do this would be to write the RDD somewhere then > read it from Beam. > > On Mon, May 23, 2022 at 9:39 AM Yushu Yao <mailto:yao.yu...@gmail.com>> wrote: > > > > Hi Folks, > > > > I know this is not the optimal way to use beam :-) But assume I only use > > the spark runner. > > > > I have a spark library (very complex) that emits a spark dataframe (or RDD). > > I also have an existing complex beam pipeline that can do post processing > > on the data inside the dataframe. > > > > However, the beam part needs a pcollection to start with. The question is, > > how can I convert a spark RDD into a pcollection? > > > > Thanks > > -Yushu > >
Re: RDD (Spark dataframe) into a PCollection?
Thanks Robert and Brian. As for "writing the RDD somewhere", I can totally write a bunch of files on disk/s3. Any other options? -Yushu On Mon, May 23, 2022 at 11:40 AM Brian Hulette wrote: > Yeah I'm not sure of any simple way to do this. I wonder if it's worth > considering building some Spark runner-specific feature around this, or at > least packaging up Robert's proposed solution? > > There could be other interesting integrations in this space too, e.g. > using Spark RDDs as a cache for Interactive Beam. > > Brian > > On Mon, May 23, 2022 at 11:35 AM Robert Bradshaw > wrote: > >> The easiest way to do this would be to write the RDD somewhere then >> read it from Beam. >> >> On Mon, May 23, 2022 at 9:39 AM Yushu Yao wrote: >> > >> > Hi Folks, >> > >> > I know this is not the optimal way to use beam :-) But assume I only >> use the spark runner. >> > >> > I have a spark library (very complex) that emits a spark dataframe (or >> RDD). >> > I also have an existing complex beam pipeline that can do post >> processing on the data inside the dataframe. >> > >> > However, the beam part needs a pcollection to start with. The question >> is, how can I convert a spark RDD into a pcollection? >> > >> > Thanks >> > -Yushu >> > >> >
Re: RDD (Spark dataframe) into a PCollection?
To add a bit more to what Robert suggested. Right, in general we can’t read Spark RDD directly with Beam (Spark runner uses RDD under the hood but it’s a different story) but you can write the results to any storage and in data format that Beam supports and then read it with a corespondent Beam IO connector. — Alexey > On 23 May 2022, at 20:35, Robert Bradshaw wrote: > > The easiest way to do this would be to write the RDD somewhere then > read it from Beam. > > On Mon, May 23, 2022 at 9:39 AM Yushu Yao wrote: >> >> Hi Folks, >> >> I know this is not the optimal way to use beam :-) But assume I only use the >> spark runner. >> >> I have a spark library (very complex) that emits a spark dataframe (or RDD). >> I also have an existing complex beam pipeline that can do post processing on >> the data inside the dataframe. >> >> However, the beam part needs a pcollection to start with. The question is, >> how can I convert a spark RDD into a pcollection? >> >> Thanks >> -Yushu >>
RDD (Spark dataframe) into a PCollection?
Hi Folks, I know this is not the optimal way to use beam :-) But assume I only use the spark runner. I have a spark library (very complex) that emits a spark dataframe (or RDD). I also have an existing complex beam pipeline that can do post processing on the data inside the dataframe. However, the beam part needs a pcollection to start with. The question is, how can I convert a spark RDD into a pcollection? Thanks -Yushu
Re: [PROPOSAL] Stop Spark 2 support in Spark Runner
https://spark.apache.org/releases/spark-release-3-0-0.html Since Spark 3 has been out almost 2 years, this seems increasingly reasonable. On Fri, Apr 29, 2022 at 4:04 AM Jean-Baptiste Onofré wrote: > +1, it makes sense to me. Users wanting "old" spark version can take > previous Beam releases. > > Regards > JB > > On Fri, Apr 29, 2022 at 12:39 PM Alexey Romanenko > wrote: > > > > Any objections or comments from Spark 2 users on this topic? > > > > — > > Alexey > > > > > > On 20 Apr 2022, at 19:17, Alexey Romanenko > wrote: > > > > Hi everyone, > > > > A while ago, we already discussed on dev@ that there are several > reasons to stop provide a support of Spark2 in Spark Runner (in all its > variants that we have for now - RDD, Dataset, Portable) [1]. In two words, > it brings some burden to Spark runner support that we would like to avoid > in the future. > > > > From the devs perspective I don’t see any objections about this. So, I’d > like to know if there are users that still uses Spark2 for their Beam > pipelines and it will be critical for them to keep using it. > > > > Please, share any your opinion on this! > > > > — > > Alexey > > > > [1] https://lists.apache.org/thread/opfhg3xjb9nptv878sygwj9gjx38rmco > > > > > On 31 Mar 2022, at 17:51, Alexey Romanenko > wrote: > > > > > > Hi everyone, > > > > > > For the moment, Beam Spark Runner supports two versions of Spark - 2.x > and 3.x. > > > > > > Taking into account the several things that: > > > - almost all cloud providers already mostly moved to Spark 3.x as a > main supported version; > > > - the latest Spark 2.x release (Spark 2.4.8, maintenance release) was > done almost a year ago; > > > - Spark 3 is considered as a mainstream Spark version for development > and bug fixing; > > > - better to avoid the burden of maintenance (there are some > incompatibilities between Spark 2 and 3) of two versions; > > > > > > I’d suggest to stop support Spark 2 for the Spark Runner in the one of > the next Beam releases. > > > > > > What are your thoughts on this? Are there any principal objections or > reasons for not doing this that I probably missed? > > > > > > — > > > Alexey > > > > > > >
Re: [PROPOSAL] Stop Spark 2 support in Spark Runner
+1, it makes sense to me. Users wanting "old" spark version can take previous Beam releases. Regards JB On Fri, Apr 29, 2022 at 12:39 PM Alexey Romanenko wrote: > > Any objections or comments from Spark 2 users on this topic? > > — > Alexey > > > On 20 Apr 2022, at 19:17, Alexey Romanenko wrote: > > Hi everyone, > > A while ago, we already discussed on dev@ that there are several reasons to > stop provide a support of Spark2 in Spark Runner (in all its variants that we > have for now - RDD, Dataset, Portable) [1]. In two words, it brings some > burden to Spark runner support that we would like to avoid in the future. > > From the devs perspective I don’t see any objections about this. So, I’d like > to know if there are users that still uses Spark2 for their Beam pipelines > and it will be critical for them to keep using it. > > Please, share any your opinion on this! > > — > Alexey > > [1] https://lists.apache.org/thread/opfhg3xjb9nptv878sygwj9gjx38rmco > > > On 31 Mar 2022, at 17:51, Alexey Romanenko wrote: > > > > Hi everyone, > > > > For the moment, Beam Spark Runner supports two versions of Spark - 2.x and > > 3.x. > > > > Taking into account the several things that: > > - almost all cloud providers already mostly moved to Spark 3.x as a main > > supported version; > > - the latest Spark 2.x release (Spark 2.4.8, maintenance release) was done > > almost a year ago; > > - Spark 3 is considered as a mainstream Spark version for development and > > bug fixing; > > - better to avoid the burden of maintenance (there are some > > incompatibilities between Spark 2 and 3) of two versions; > > > > I’d suggest to stop support Spark 2 for the Spark Runner in the one of the > > next Beam releases. > > > > What are your thoughts on this? Are there any principal objections or > > reasons for not doing this that I probably missed? > > > > — > > Alexey > > > >
[PROPOSAL] Stop Spark 2 support in Spark Runner
Any objections or comments from Spark 2 users on this topic? — Alexey On 20 Apr 2022, at 19:17, Alexey Romanenko wrote: Hi everyone, A while ago, we already discussed on dev@ that there are several reasons to stop provide a support of Spark2 in Spark Runner (in all its variants that we have for now - RDD, Dataset, Portable) [1]. In two words, it brings some burden to Spark runner support that we would like to avoid in the future. From the devs perspective I don’t see any objections about this. So, I’d like to know if there are users that still uses Spark2 for their Beam pipelines and it will be critical for them to keep using it. Please, share any your opinion on this! — Alexey [1] https://lists.apache.org/thread/opfhg3xjb9nptv878sygwj9gjx38rmco > On 31 Mar 2022, at 17:51, Alexey Romanenko wrote: > > Hi everyone, > > For the moment, Beam Spark Runner supports two versions of Spark - 2.x and > 3.x. > > Taking into account the several things that: > - almost all cloud providers already mostly moved to Spark 3.x as a main > supported version; > - the latest Spark 2.x release (Spark 2.4.8, maintenance release) was done > almost a year ago; > - Spark 3 is considered as a mainstream Spark version for development and bug > fixing; > - better to avoid the burden of maintenance (there are some incompatibilities > between Spark 2 and 3) of two versions; > > I’d suggest to stop support Spark 2 for the Spark Runner in the one of the > next Beam releases. > > What are your thoughts on this? Are there any principal objections or reasons > for not doing this that I probably missed? > > — > Alexey > >
Re: [PROPOSAL] Stop Spark2 support in Spark Runner
Hi everyone, A while ago, we already discussed on dev@ that there are several reasons to stop provide a support of Spark2 in Spark Runner (in all its variants that we have for now - RDD, Dataset, Portable) [1]. In two words, it brings some burden to Spark runner support that we would like to avoid in the future. From the devs perspective I don’t see any objections about this. So, I’d like to know if there are users that still uses Spark2 for their Beam pipelines and it will be critical for them to keep using it. Please, share any your opinion on this! — Alexey [1] https://lists.apache.org/thread/opfhg3xjb9nptv878sygwj9gjx38rmco > On 31 Mar 2022, at 17:51, Alexey Romanenko wrote: > > Hi everyone, > > For the moment, Beam Spark Runner supports two versions of Spark - 2.x and > 3.x. > > Taking into account the several things that: > - almost all cloud providers already mostly moved to Spark 3.x as a main > supported version; > - the latest Spark 2.x release (Spark 2.4.8, maintenance release) was done > almost a year ago; > - Spark 3 is considered as a mainstream Spark version for development and bug > fixing; > - better to avoid the burden of maintenance (there are some incompatibilities > between Spark 2 and 3) of two versions; > > I’d suggest to stop support Spark 2 for the Spark Runner in the one of the > next Beam releases. > > What are your thoughts on this? Are there any principal objections or reasons > for not doing this that I probably missed? > > — > Alexey > >
Re: Re: [Question] Spark: standard setup to use beam-spark to parallelize python code
We made some progress to parallelize our python code using beam-spark. Following your advice, we are using spark 3.2.1 The spark server and worker are connected ok. In a third machine, the client machine, I am running the docker jobserver: $ sudo docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://:7077 Then on the client: $ python test_beam.py If it matters, the code in test_beam.py has the following: options = PipelineOptions([ "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--save_main_session", "--environment_type=DOCKER", "--environment_config=docker.io/apache/beam_python3.8_sdk:2.37.0" ]) with beam.Pipeline(options=options) as p: lines = (p | 'Create words' >> beam.Create(['this is working']) | 'Add hostname' >> beam.Map(lambda words: addhost(words)) | 'Split words' >> beam.FlatMap(lambda words: words.split(' ')) | 'Build byte array' >> beam.ParDo(ConvertToByteArray()) | 'Group' >> beam.GroupBy() # Do future batching here | 'print output' >> beam.Map(myprint) ) I think I got the versions wrong because the the server logs gives (192.168.1.252 is the client IP): 22/04/04 11:51:57 DEBUG TransportServer: New connection accepted for remote address /192.168.1.252:53330. 22/04/04 11:51:57 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message. java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 6543101073799644159, local class serialVersionUID = 1574364215946805297 …. On the client logs, I got: 22/04/04 11:51:56 INFO org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Staging artifacts for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27. 22/04/04 11:51:56 INFO org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Resolving artifacts for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.ref_Environment_default_environment_1. 22/04/04 11:51:56 INFO org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Getting 1 artifacts for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.null. 22/04/04 11:51:56 INFO org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Artifacts fully staged for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27. 22/04/04 11:51:56 INFO org.apache.beam.runners.spark.SparkJobInvoker: Invoking job BeamApp-root-0404115156-3cc5f493_e0deca3a-6b67-40ad-bfe8-55ba7efd9038 22/04/04 11:51:56 INFO org.apache.beam.runners.jobsubmission.JobInvocation: Starting job invocation BeamApp-root-0404115156-3cc5f493_e0deca3a-6b67-40ad-bfe8-55ba7efd9038 22/04/04 11:51:56 INFO org.apache.beam.runners.core.construction.resources.PipelineResources: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 6 files. Enable logging at DEBUG level to see which files will be staged. 22/04/04 11:51:56 INFO org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a brand new Spark Context. 22/04/04 11:51:56 WARN org.apache.spark.util.Utils: Your hostname, spark-ml-client resolves to a loopback address: 127.0.0.1; using 192.168.1.252 instead (on interface eth0) 22/04/04 11:51:56 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 22/04/04 11:51:57 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 22/04/04 11:52:57 ERROR org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 22/04/04 11:52:57 WARN org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application ID is not initialized yet. 22/04/04 11:52:57 WARN org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master 22/04/04 11:52:57 WARN org.apache.spark.metrics.MetricsSystem: Stopping a MetricsSystem that is not running 22/04/04 11:52:57 ERROR org.apache.spark.SparkContext: Error initializing SparkContext. java.lang.NullPointerException at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:64) at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:248) at org.apache.spark.SparkContext.(SparkContext.scala:510) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) at org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:101) at org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:67)
Re: [Question] Spark: standard setup to use beam-spark to parallelize python code
> On 28 Mar 2022, at 20:58, Mihai Alexe wrote: > > the jackson runtime dependencies should be updated manually (at least to > 2.9.2) in case of using Spark 2.x > > yes - that is exactly what we are looking to achieve, any hints about how to > do that? We’re not Java experts. Do you happen to have a CI recipe or binary > lis for this particular configuration? Thank you! For our testing pipelines, that run on Spark 2.4.7, we just build them with a recent version of jackson libs [1]. Though, iiuc, you don’t build any java code. So, what actually is an issue that you are facing? > use Spark 3..x if possible since it already provides jackson jars of version > 2.10.0. > > we tried this too but ran into other compatibility problems. Seems that the > Beam Spark runner (in v 2.37.0) only supports the Spark 2.x branch, as per > the Beam docs https://beam.apache.org/documentation/runners/spark/ > <https://beam.apache.org/documentation/runners/spark/> Which exactly Spark 3.x version you did try? AFAICT, Beam 2.37.0 supports and was tested with Spark 3.1.2 / Scala 2.12 artifacts. — Alexey [1] https://github.com/Talend/beam-samples/blob/9288606495b9ba8f77383cd9709ed9b5783deeb8/pom.xml#L66 > > any ideas? > > On 2022/03/28 17:38:13 Alexey Romanenko wrote: > > Well, it’s caused by recent jackson's version update in Beam [1] - so, the > > jackson runtime dependencies should be updated manually (at least to 2.9.2) > > in case of using Spark 2.x. > > > > Either, use Spark 3..x if possible since it already provides jackson jars > > of version 2.10.0. > > > > [1] > > https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c > > > > <https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c><https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c> > > > > <https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c%3e> > > > > — > > Alexey > > > > > On 28 Mar 2022, at 14:15, Florian Pinault > > <mailto:fl...@ecmwf.int>> wrote: > > > > > > Greetings, > > > > > > We are setting up an Apache Beam cluster using Spark as a backend to run > > > python code. This is currently a toy example with 4 virtual machines > > > running Centos (a client, a spark main, and two spark-workers). > > > We are running into version issues (detail below) and would need help on > > > which versions to set up. > > > We currently are trying spark-2.4.8-bin-hadoop2.7, with the pip package > > > beam 2.37.0 on the client, and using a job-server to create docker image. > > > > > > I saw here https://beam.apache.org/blog/beam-2.33.0/ > > > <https://beam.apache.org/blog/beam-2.33.0/> > > > <https://beam.apache.org/blog/beam-2.33.0/> > > > <https://beam.apache.org/blog/beam-2.33.0/%3e> that "Spark 2.x users will > > > need to update Spark's Jackson runtime dependencies > > > (spark.jackson.version) to at least version 2.9.2, due to Beam updating > > > its dependencies." > > > But it looks like the jackson-core version in the job-server is 2.13.0 > > > whereas the jars in spark-2.4.8-bin-hadoop2.7/jars are > > > -. 1 mluser mluser 46986 May 8 2021 jackson-annotations-2.6.7.jar > > > -. 1 mluser mluser 258919 May 8 2021 jackson-core-2.6.7.jar > > > -. 1 mluser mluser 232248 May 8 2021 jackson-core-asl-1.9.13.jar > > > -. 1 mluser mluser 1166637 May 8 2021 jackson-databind-2.6.7.3.jar > > > -. 1 mluser mluser 320444 May 8 2021 jackson-dataformat-yaml-2.6.7.jar > > > -. 1 mluser mluser 18336 May 8 2021 jackson-jaxrs-1.9.13.jar > > > -. 1 mluser mluser 780664 May 8 2021 jackson-mapper-asl-1.9.13.jar > > > -. 1 mluser mluser 32612 May 8 2021 > > > jackson-module-jaxb-annotations-2.6.7.jar > > > -. 1 mluser mluser 42858 May 8 2021 jackson-module-paranamer-2.7.9.jar > > > -. 1 mluser mluser 515645 May 8 2021 jackson-module-scala_2.11-2.6.7.1.jar > > > > > > There must be something to update, but I am not sure how to update these > > > jar files with their dependencies, and not sure if this would get us very > > > far. > > > > > > Would you have a list of binaries that work together or some running CI > > > from the apache foundation similar to what we are trying to achieve? > > > >
RE: Re: [Question] Spark: standard setup to use beam-spark to parallelize python code
* the jackson runtime dependencies should be updated manually (at least to 2.9.2) in case of using Spark 2.x yes - that is exactly what we are looking to achieve, any hints about how to do that? We’re not Java experts. Do you happen to have a CI recipe or binary lis for this particular configuration? Thank you! * use Spark 3..x if possible since it already provides jackson jars of version 2.10.0. we tried this too but ran into other compatibility problems. Seems that the Beam Spark runner (in v 2.37.0) only supports the Spark 2.x branch, as per the Beam docs https://beam.apache.org/documentation/runners/spark/ any ideas? On 2022/03/28 17:38:13 Alexey Romanenko wrote: > Well, it’s caused by recent jackson's version update in Beam [1] - so, the > jackson runtime dependencies should be updated manually (at least to 2.9.2) > in case of using Spark 2.x. > > Either, use Spark 3..x if possible since it already provides jackson jars of > version 2.10.0. > > [1] > https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c > > <https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c><https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c%3e> > > — > Alexey > > > On 28 Mar 2022, at 14:15, Florian Pinault > > mailto:fl...@ecmwf.int>> wrote: > > > > Greetings, > > > > We are setting up an Apache Beam cluster using Spark as a backend to run > > python code. This is currently a toy example with 4 virtual machines > > running Centos (a client, a spark main, and two spark-workers). > > We are running into version issues (detail below) and would need help on > > which versions to set up. > > We currently are trying spark-2.4.8-bin-hadoop2.7, with the pip package > > beam 2.37.0 on the client, and using a job-server to create docker image. > > > > I saw here https://beam.apache.org/blog/beam-2.33.0/ > > <https://beam.apache.org/blog/beam-2.33.0/><https://beam.apache.org/blog/beam-2.33.0/%3e> > > that "Spark 2.x users will need to update Spark's Jackson runtime > > dependencies (spark.jackson.version) to at least version 2.9.2, due to Beam > > updating its dependencies." > > But it looks like the jackson-core version in the job-server is 2.13.0 > > whereas the jars in spark-2.4.8-bin-hadoop2.7/jars are > > -. 1 mluser mluser 46986 May 8 2021 jackson-annotations-2.6.7.jar > > -. 1 mluser mluser 258919 May 8 2021 jackson-core-2.6.7.jar > > -. 1 mluser mluser 232248 May 8 2021 jackson-core-asl-1.9.13.jar > > -. 1 mluser mluser 1166637 May 8 2021 jackson-databind-2.6.7.3.jar > > -. 1 mluser mluser 320444 May 8 2021 jackson-dataformat-yaml-2.6.7.jar > > -. 1 mluser mluser 18336 May 8 2021 jackson-jaxrs-1.9.13.jar > > -. 1 mluser mluser 780664 May 8 2021 jackson-mapper-asl-1.9.13.jar > > -. 1 mluser mluser 32612 May 8 2021 > > jackson-module-jaxb-annotations-2.6.7.jar > > -. 1 mluser mluser 42858 May 8 2021 jackson-module-paranamer-2.7.9.jar > > -. 1 mluser mluser 515645 May 8 2021 jackson-module-scala_2.11-2.6.7.1.jar > > > > There must be something to update, but I am not sure how to update these > > jar files with their dependencies, and not sure if this would get us very > > far. > > > > Would you have a list of binaries that work together or some running CI > > from the apache foundation similar to what we are trying to achieve? > >
Re: [Question] Spark: standard setup to use beam-spark to parallelize python code
Well, it’s caused by recent jackson's version update in Beam [1] - so, the jackson runtime dependencies should be updated manually (at least to 2.9.2) in case of using Spark 2.x. Either, use Spark 3..x if possible since it already provides jackson jars of version 2.10.0. [1] https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c <https://github.com/apache/beam/commit/9694f70df1447e96684b665279679edafec13a0c> — Alexey > On 28 Mar 2022, at 14:15, Florian Pinault wrote: > > Greetings, > > We are setting up an Apache Beam cluster using Spark as a backend to run > python code. This is currently a toy example with 4 virtual machines running > Centos (a client, a spark main, and two spark-workers). > We are running into version issues (detail below) and would need help on > which versions to set up. > We currently are trying spark-2.4.8-bin-hadoop2.7, with the pip package beam > 2.37.0 on the client, and using a job-server to create docker image. > > I saw here https://beam.apache.org/blog/beam-2.33.0/ > <https://beam.apache.org/blog/beam-2.33.0/> that "Spark 2.x users will need > to update Spark's Jackson runtime dependencies (spark.jackson.version) to at > least version 2.9.2, due to Beam updating its dependencies." > But it looks like the jackson-core version in the job-server is 2.13.0 > whereas the jars in spark-2.4.8-bin-hadoop2.7/jars are > -. 1 mluser mluser 46986 May 8 2021 jackson-annotations-2.6.7.jar > -. 1 mluser mluser 258919 May 8 2021 jackson-core-2.6.7.jar > -. 1 mluser mluser 232248 May 8 2021 jackson-core-asl-1.9.13.jar > -. 1 mluser mluser 1166637 May 8 2021 jackson-databind-2.6.7.3.jar > -. 1 mluser mluser 320444 May 8 2021 jackson-dataformat-yaml-2.6.7.jar > -. 1 mluser mluser 18336 May 8 2021 jackson-jaxrs-1.9.13.jar > -. 1 mluser mluser 780664 May 8 2021 jackson-mapper-asl-1.9.13.jar > -. 1 mluser mluser 32612 May 8 2021 jackson-module-jaxb-annotations-2.6.7.jar > -. 1 mluser mluser 42858 May 8 2021 jackson-module-paranamer-2.7.9.jar > -. 1 mluser mluser 515645 May 8 2021 jackson-module-scala_2.11-2.6.7.1.jar > > There must be something to update, but I am not sure how to update these jar > files with their dependencies, and not sure if this would get us very far. > > Would you have a list of binaries that work together or some running CI from > the apache foundation similar to what we are trying to achieve?
[Question] Spark: standard setup to use beam-spark to parallelize python code
Greetings, We are setting up an Apache Beam cluster using Spark as a backend to run python code. This is currently a toy example with 4 virtual machines running Centos (a client, a spark main, and two spark-workers). We are running into version issues (detail below) and would need help on which versions to set up. We currently are trying spark-2.4.8-bin-hadoop2.7, with the pip package beam 2.37.0 on the client, and using a job-server to create docker image. I saw here https://beam.apache.org/blog/beam-2.33.0/ that "Spark 2.x users will need to update Spark's Jackson runtime dependencies (spark.jackson.version) to at least version 2.9.2, due to Beam updating its dependencies." But it looks like the jackson-core version in the job-server is 2.13.0 whereas the jars in spark-2.4.8-bin-hadoop2.7/jars are -. 1 mluser mluser 46986 May 8 2021 jackson-annotations-2.6.7.jar -. 1 mluser mluser 258919 May 8 2021 jackson-core-2.6.7.jar -. 1 mluser mluser 232248 May 8 2021 jackson-core-asl-1.9.13.jar -. 1 mluser mluser 1166637 May 8 2021 jackson-databind-2.6.7.3.jar -. 1 mluser mluser 320444 May 8 2021 jackson-dataformat-yaml-2.6.7.jar -. 1 mluser mluser 18336 May 8 2021 jackson-jaxrs-1.9.13.jar -. 1 mluser mluser 780664 May 8 2021 jackson-mapper-asl-1.9.13.jar -. 1 mluser mluser 32612 May 8 2021 jackson-module-jaxb-annotations-2.6.7.jar -. 1 mluser mluser 42858 May 8 2021 jackson-module-paranamer-2.7.9.jar -. 1 mluser mluser 515645 May 8 2021 jackson-module-scala_2.11-2.6.7.1.jar There must be something to update, but I am not sure how to update these jar files with their dependencies, and not sure if this would get us very far. Would you have a list of binaries that work together or some running CI from the apache foundation similar to what we are trying to achieve?
Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark
Thank you for quick answers, Utkarsh, but unfortunately, I don’t see the real cause of this right now. Seems like, it will require some remote debugging on your site to see what workers are actually doing. > On 1 Feb 2022, at 22:59, Utkarsh Parekh wrote: > > If you tested earlier with the same stack, which version did you use? > > Can you enable debug logs to check what’s happening there? So far the > following warning was received from from log4j which I received from log4j on > Databricks (no errors other than that). > > Can you make sure that there is no issue with firewall or something? No I > don't think so. Because it's working fine locally and databricks notebook. > > Can you run this pipeline locally against a real Kafka server, not Azure > Event Hub, to make sure that it works fine? Yes it's working fine with both > Azure EventHub and Kafka > > > org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()' > at > org.springframework.expression.spel.support.StandardTypeConverter.(StandardTypeConverter.java:46) > at > org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197) > at > org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115) > at > org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201) > at > org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130) > at > org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52) > at > org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377) > at > org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) > at > org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121) > at > org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262) > at > org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124) > at > org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111) > at > org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) > at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393) > at > org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486) > at org.apache.spark.storage.BlockManager.org > <http://org.apache.spark.storage.blockmanager.org/>$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:342) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:344) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:344) > at > org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) > at > com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) > at > org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) > at > com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) &g
Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark
And I also get this error occasionally when I execute a streaming pipeline with a new cluster instead of an existing cluster. https://issues.apache.org/jira/browse/BEAM-12032?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel On Tue, Feb 1, 2022 at 1:59 PM Utkarsh Parekh wrote: > If you tested earlier with the same stack, which version did you use? > > *Can you enable debug logs to check what’s happening there? *So far the > following warning was received from from log4j which I received from log4j > on Databricks (no errors other than that). > > *Can you make sure that there is no issue with firewall or something? *No > I don't think so. Because it's working fine locally and databricks notebook. > > *Can you run this pipeline locally against a real Kafka server, not Azure > Event Hub, to make sure that it works fine? *Yes it's working fine with > both Azure EventHub and Kafka > > > > org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()' > at > org.springframework.expression.spel.support.StandardTypeConverter.(StandardTypeConverter.java:46) > at > org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197) > at > org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115) > at > org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201) > at > org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130) > at > org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52) > at > org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377) > at > org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) > at > org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121) > at > org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262) > at > org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124) > at > org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111) > at > org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) > at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393) > at > org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486) > at org.apache.spark.storage.BlockManager.org > $apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:342) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:344) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:344) > at > org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) > at > com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) > at > org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) > at > com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) > at org.apache.spark.scheduler.ResultT
Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark
If you tested earlier with the same stack, which version did you use? *Can you enable debug logs to check what’s happening there? *So far the following warning was received from from log4j which I received from log4j on Databricks (no errors other than that). *Can you make sure that there is no issue with firewall or something? *No I don't think so. Because it's working fine locally and databricks notebook. *Can you run this pipeline locally against a real Kafka server, not Azure Event Hub, to make sure that it works fine? *Yes it's working fine with both Azure EventHub and Kafka org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()' at org.springframework.expression.spel.support.StandardTypeConverter.(StandardTypeConverter.java:46) at org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197) at org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115) at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201) at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130) at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52) at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377) at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121) at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262) at org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124) at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85) at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207) at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227) at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172) at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111) at org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181) at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391) at org.apache.spark.rdd.RDD.iterator(RDD.scala:342) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) at org.apache.spark.rdd.RDD.iterator(RDD.scala:344) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) at org.apache.spark.rdd.RDD.iterator(RDD.scala:344) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:93) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:824) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1621) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:827) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run
Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark
Well, personally I didn’t test with this version, but it should be fine… Can you enable debug logs to check what’s happening there? Can you make sure that there is no issue with firewall or something? Can you run this pipeline locally against a real Kafka server, not Azure Event Hub, to make sure that it works fine? Otherwise, it would need to debug remotely the worker process. > On 1 Feb 2022, at 19:18, Utkarsh Parekh wrote: > > Sorry I sent the last message in a hurry. Here is the Beam java to kafka: Is > something missing here? > > > org.apache.beam > beam-sdks-java-io-kafka > 2.35.0 > > > On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh <mailto:utkarsh.s.par...@gmail.com>> wrote: > Here it is > > > org.apache.kafka > kafka-clients > 2.8.0 > > > On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko <mailto:aromanenko@gmail.com>> wrote: > Hmm, this is strange. Which version of Kafka client do you use while running > it with Beam? > >> On 1 Feb 2022, at 16:56, Utkarsh Parekh > <mailto:utkarsh.s.par...@gmail.com>> wrote: >> >> Hi Alexey, >> >> First of all, thank you for the response! Yes I did have it in Consumer >> configuration and try to increase "session.timeout". >> >> From consumer side so far I've following settings: >> props.put("sasl.mechanism", SASL_MECHANISM); >> props.put("security.protocol", SECURITY_PROTOCOL); >> props.put("sasl.jaas.config", saslJaasConfig); >> props.put("request.timeout.ms <http://request.timeout.ms/>", 6); >> props.put("session.timeout.ms <http://session.timeout.ms/>", 6); >> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); >> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); >> >> It works fine using following code in Databricks Notebook. The problem has >> been occurring when I run it through Apache beam and KafkaIO (Just providing >> more context if that may help you to understand problem) >> >> val df = spark.readStream >> .format("kafka") >> .option("subscribe", TOPIC) >> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) >> .option("kafka.sasl.mechanism", "PLAIN") >> .option("kafka.security.protocol", "SASL_SSL") >> .option("kafka.sasl.jaas.config", EH_SASL) >> .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", >> "6") >> .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", >> "6") >> .option("failOnDataLoss", "false") >> //.option("kafka.group.id <http://kafka.group.id/>", "testsink") >> .option("startingOffsets", "latest") >> .load() >> >> Utkarsh >> >> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko > <mailto:aromanenko@gmail.com>> wrote: >> Hi Utkarsh, >> >> Can it be related to this configuration problem? >> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received >> >> <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received> >> >> Did you check timeout settings? >> >> — >> Alexey >> >> >>> On 1 Feb 2022, at 02:27, Utkarsh Parekh >> <mailto:utkarsh.s.par...@gmail.com>> wrote: >>> >>> Hello, >>> >>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying >>> to create a simple streaming app with Apache Beam, where it reads data from >>> an Azure event hub and produces messages into another Azure event hub. >>> >>> I'm creating and running spark jobs on Azure Databricks. >>> >>> The problem is the consumer (uses SparkRunner) is not able to read data >>> from Event hub (queue). There is no activity and no errors on the Spark >>> cluster. >>> >>> I would appreciate it if anyone could help to fix this issue. >>> >>> Thank you >>> >>> Utkarsh >> >
Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark
Sorry I sent the last message in a hurry. Here is the Beam java to kafka: Is something missing here? org.apache.beam beam-sdks-java-io-kafka 2.35.0 On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh wrote: > Here it is > > > org.apache.kafka > kafka-clients > 2.8.0 > > > > On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko > wrote: > >> Hmm, this is strange. Which version of Kafka client do you use while >> running it with Beam? >> >> On 1 Feb 2022, at 16:56, Utkarsh Parekh >> wrote: >> >> Hi Alexey, >> >> First of all, thank you for the response! Yes I did have it in Consumer >> configuration and try to increase "session.timeout". >> >> From consumer side so far I've following settings: >> >> props.put("sasl.mechanism", SASL_MECHANISM); >> props.put("security.protocol", SECURITY_PROTOCOL); >> props.put("sasl.jaas.config", saslJaasConfig); >> props.put("request.timeout.ms", 6); >> props.put("session.timeout.ms", 6); >> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); >> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); >> >> >> It works fine using following code in Databricks Notebook. The problem >> has been occurring when I run it through Apache beam and KafkaIO (Just >> providing more context if that may help you to understand problem) >> >> val df = spark.readStream >> .format("kafka") >> .option("subscribe", TOPIC) >> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) >> .option("kafka.sasl.mechanism", "PLAIN") >> .option("kafka.security.protocol", "SASL_SSL") >> .option("kafka.sasl.jaas.config", EH_SASL) >> .option("kafka.request.timeout.ms", "6") >> .option("kafka.session.timeout.ms", "6") >> .option("failOnDataLoss", "false") >> //.option("kafka.group.id", "testsink") >> .option("startingOffsets", "latest") >> .load() >> >> Utkarsh >> >> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko >> wrote: >> >>> Hi Utkarsh, >>> >>> Can it be related to this configuration problem? >>> >>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received >>> >>> Did you check timeout settings? >>> >>> — >>> Alexey >>> >>> >>> On 1 Feb 2022, at 02:27, Utkarsh Parekh >>> wrote: >>> >>> Hello, >>> >>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm >>> trying to create a simple streaming app with Apache Beam, where it reads >>> data from an Azure event hub and produces messages into another Azure event >>> hub. >>> >>> I'm creating and running spark jobs on Azure Databricks. >>> >>> The problem is the consumer (uses SparkRunner) is not able to read data >>> from Event hub (queue). There is no activity and no errors on the Spark >>> cluster. >>> >>> I would appreciate it if anyone could help to fix this issue. >>> >>> Thank you >>> >>> Utkarsh >>> >>> >>> >>
Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark
Here it is org.apache.kafka kafka-clients 2.8.0 On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko wrote: > Hmm, this is strange. Which version of Kafka client do you use while > running it with Beam? > > On 1 Feb 2022, at 16:56, Utkarsh Parekh > wrote: > > Hi Alexey, > > First of all, thank you for the response! Yes I did have it in Consumer > configuration and try to increase "session.timeout". > > From consumer side so far I've following settings: > > props.put("sasl.mechanism", SASL_MECHANISM); > props.put("security.protocol", SECURITY_PROTOCOL); > props.put("sasl.jaas.config", saslJaasConfig); > props.put("request.timeout.ms", 6); > props.put("session.timeout.ms", 6); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); > props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); > > > It works fine using following code in Databricks Notebook. The problem has > been occurring when I run it through Apache beam and KafkaIO (Just > providing more context if that may help you to understand problem) > > val df = spark.readStream > .format("kafka") > .option("subscribe", TOPIC) > .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) > .option("kafka.sasl.mechanism", "PLAIN") > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.jaas.config", EH_SASL) > .option("kafka.request.timeout.ms", "6") > .option("kafka.session.timeout.ms", "6") > .option("failOnDataLoss", "false") > //.option("kafka.group.id", "testsink") > .option("startingOffsets", "latest") > .load() > > Utkarsh > > On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko > wrote: > >> Hi Utkarsh, >> >> Can it be related to this configuration problem? >> >> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received >> >> Did you check timeout settings? >> >> — >> Alexey >> >> >> On 1 Feb 2022, at 02:27, Utkarsh Parekh >> wrote: >> >> Hello, >> >> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm >> trying to create a simple streaming app with Apache Beam, where it reads >> data from an Azure event hub and produces messages into another Azure event >> hub. >> >> I'm creating and running spark jobs on Azure Databricks. >> >> The problem is the consumer (uses SparkRunner) is not able to read data >> from Event hub (queue). There is no activity and no errors on the Spark >> cluster. >> >> I would appreciate it if anyone could help to fix this issue. >> >> Thank you >> >> Utkarsh >> >> >> >
Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark
Hmm, this is strange. Which version of Kafka client do you use while running it with Beam? > On 1 Feb 2022, at 16:56, Utkarsh Parekh wrote: > > Hi Alexey, > > First of all, thank you for the response! Yes I did have it in Consumer > configuration and try to increase "session.timeout". > > From consumer side so far I've following settings: > props.put("sasl.mechanism", SASL_MECHANISM); > props.put("security.protocol", SECURITY_PROTOCOL); > props.put("sasl.jaas.config", saslJaasConfig); > props.put("request.timeout.ms <http://request.timeout.ms/>", 6); > props.put("session.timeout.ms <http://session.timeout.ms/>", 6); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); > props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); > > It works fine using following code in Databricks Notebook. The problem has > been occurring when I run it through Apache beam and KafkaIO (Just providing > more context if that may help you to understand problem) > > val df = spark.readStream > .format("kafka") > .option("subscribe", TOPIC) > .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) > .option("kafka.sasl.mechanism", "PLAIN") > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.jaas.config", EH_SASL) > .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", > "6") > .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", > "6") > .option("failOnDataLoss", "false") > //.option("kafka.group.id <http://kafka.group.id/>", "testsink") > .option("startingOffsets", "latest") > .load() > > Utkarsh > > On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <mailto:aromanenko@gmail.com>> wrote: > Hi Utkarsh, > > Can it be related to this configuration problem? > https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received > > <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received> > > Did you check timeout settings? > > — > Alexey > > >> On 1 Feb 2022, at 02:27, Utkarsh Parekh > <mailto:utkarsh.s.par...@gmail.com>> wrote: >> >> Hello, >> >> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying >> to create a simple streaming app with Apache Beam, where it reads data from >> an Azure event hub and produces messages into another Azure event hub. >> >> I'm creating and running spark jobs on Azure Databricks. >> >> The problem is the consumer (uses SparkRunner) is not able to read data from >> Event hub (queue). There is no activity and no errors on the Spark cluster. >> >> I would appreciate it if anyone could help to fix this issue. >> >> Thank you >> >> Utkarsh >
Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark
Hi Alexey, First of all, thank you for the response! Yes I did have it in Consumer configuration and try to increase "session.timeout". >From consumer side so far I've following settings: props.put("sasl.mechanism", SASL_MECHANISM); props.put("security.protocol", SECURITY_PROTOCOL); props.put("sasl.jaas.config", saslJaasConfig); props.put("request.timeout.ms", 6); props.put("session.timeout.ms", 6); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); It works fine using following code in Databricks Notebook. The problem has been occurring when I run it through Apache beam and KafkaIO (Just providing more context if that may help you to understand problem) val df = spark.readStream .format("kafka") .option("subscribe", TOPIC) .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) .option("kafka.sasl.mechanism", "PLAIN") .option("kafka.security.protocol", "SASL_SSL") .option("kafka.sasl.jaas.config", EH_SASL) .option("kafka.request.timeout.ms", "6") .option("kafka.session.timeout.ms", "6") .option("failOnDataLoss", "false") //.option("kafka.group.id", "testsink") .option("startingOffsets", "latest") .load() Utkarsh On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko wrote: > Hi Utkarsh, > > Can it be related to this configuration problem? > > https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received > > Did you check timeout settings? > > — > Alexey > > > On 1 Feb 2022, at 02:27, Utkarsh Parekh > wrote: > > Hello, > > I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm > trying to create a simple streaming app with Apache Beam, where it reads > data from an Azure event hub and produces messages into another Azure event > hub. > > I'm creating and running spark jobs on Azure Databricks. > > The problem is the consumer (uses SparkRunner) is not able to read data > from Event hub (queue). There is no activity and no errors on the Spark > cluster. > > I would appreciate it if anyone could help to fix this issue. > > Thank you > > Utkarsh > > >
Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark
Hi Utkarsh, Can it be related to this configuration problem? https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received> Did you check timeout settings? — Alexey > On 1 Feb 2022, at 02:27, Utkarsh Parekh wrote: > > Hello, > > I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying > to create a simple streaming app with Apache Beam, where it reads data from > an Azure event hub and produces messages into another Azure event hub. > > I'm creating and running spark jobs on Azure Databricks. > > The problem is the consumer (uses SparkRunner) is not able to read data from > Event hub (queue). There is no activity and no errors on the Spark cluster. > > I would appreciate it if anyone could help to fix this issue. > > Thank you > > Utkarsh
Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark
Hello, I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying to create a simple streaming app with Apache Beam, where it reads data from an Azure event hub and produces messages into another Azure event hub. I'm creating and running spark jobs on Azure Databricks. The problem is the consumer (uses SparkRunner) is not able to read data from Event hub (queue). There is no activity and no errors on the Spark cluster. I would appreciate it if anyone could help to fix this issue. Thank you Utkarsh
Re: Compatibility of Spark portable runner
Generally speaking, to avoid the potential issues the versions that are used in compile time and in runtime should be the same (most important is Scala versions) but, due to Spark backward compatibility, the minor versions can be different. > On 5 Jan 2022, at 07:50, Zheng Ni wrote: > > Hi Beam Community, > > Greetings. > > I am interested in submitting a spark job through portable runner. I have a > question about the compatibility between spark_job_server and spark cluster. > > Let’s say I am going to use beam_spark_job_server of version 2.35.0. How > could I know which spark cluster version is compatible with it? Or could it > work with any versions of the spark cluster? > > <> > Regards, > Zheng >
Compatibility of Spark portable runner
Hi Beam Community, Greetings. I am interested in submitting a spark job through portable runner. I have a question about the compatibility between spark_job_server and spark cluster. Let’s say I am going to use beam_spark_job_server of version 2.35.0. How could I know which spark cluster version is compatible with it? Or could it work with any versions of the spark cluster? Regards, Zheng
Re: Perf issue with Beam on spark (spark runner)
Robert, Do you have any numbers by chance regarding this optimisation? Alexey > On 5 Oct 2021, at 00:27, Robert Bradshaw wrote: > > https://github.com/apache/beam/pull/15637 > <https://github.com/apache/beam/pull/15637> might help some. > > On Thu, Sep 9, 2021 at 5:21 PM Tao Li <mailto:t...@zillow.com>> wrote: > Thanks Mike for this info! > > > > From: Mike Kaplinskiy mailto:m...@ladderlife.com>> > Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" > mailto:user@beam.apache.org>> > Date: Tuesday, September 7, 2021 at 2:15 PM > To: "user@beam.apache.org <mailto:user@beam.apache.org>" > mailto:user@beam.apache.org>> > Cc: Alexey Romanenko <mailto:aromanenko@gmail.com>>, Andrew Pilloud <mailto:apill...@google.com>>, Ismaël Mejía <mailto:ieme...@gmail.com>>, Kyle Weaver <mailto:kcwea...@google.com>>, Yuchu Cao <mailto:yuc...@trulia.com>> > Subject: Re: Perf issue with Beam on spark (spark runner) > > > > A long time ago when I was experimenting with the Spark runner for a batch > job, I noticed that a lot of time was spend in GC as well. In my case I > narrowed it down to how the Spark runner implements Coders. > > > > Spark's value prop is that it only serializes data when it truly has no other > choice - i.e. when it needs to reclaim memory or when it sends things over > the wire. Unfortunately due to the mismatch in serialization APIs between > Beam and Spark, Beam's Spark runner actually just serializes things all the > time. My theory was that the to/from byte array dance was slow. I attempted > to fix this at https://github.com/apache/beam/pull/8371 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fpull%2F8371=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187448677%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=ZtgDb0R3gjSHVU1rpp6T0ZVl7ZhXXRhH%2BqFMX8Z1z%2Bo%3D=0> > but I could never actually reproduce a speedup in performance benchmarks. > > > > If you're feeling up to it, you could try reviving something like that PR and > see if it helps. > > > > Mike. > > Ladder > <https://nam11.safelinks.protection.outlook.com/?url=http%3A%2F%2Fbit.ly%2F1VRtWfS=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=RBjmeAAqHdrZmXZEP7ONXwXZyLOwwx6tQbST%2Bs6wq2Q%3D=0>. > The smart, modern way to insure your life. > > > > > > On Sat, Aug 14, 2021 at 4:35 PM Tao Li <mailto:t...@zillow.com>> wrote: > > @Alexey Romanenko <mailto:aromanenko@gmail.com> I tried out ParquetIO > splittable and the processing time improved from 10 min to 6 min, but still > much longer than 2 min using a native spark app. > > > > We are still seeing a lot of GC cost from below call stack. Do you think this > ticket can fix this issue https://issues.apache.org/jira/browse/BEAM-12646 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cfc203509e0994ed0fc8b08d9724473bc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637666461187458627%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=Y4OpoFWLzBOf9Lfzg%2BBc%2ByTSsnIh%2FQVU4FSfrU93L%2F0%3D=0> > ? Thanks. > > > > > > > > > > > > From: Tao Li mailto:t...@zillow.com>> > Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" > mailto:user@beam.apache.org>> > Date: Friday, August 6, 2021 at 11:12 AM > To: Alexey Romanenko <mailto:aromanenko@gmail.com>> > Cc: "user@beam.apache.org <mailto:user@beam.apache.org>" > mailto:user@beam.apache.org>>, Andrew Pilloud > mailto:apill...@google.com>>, Ismaël Mejía > mailto:ieme...@gmail.com>>, Kyle Weaver > mailto:kcwea...@google.com>>, Yuchu Cao > mailto:yuc...@trulia.com>> > Subject: Re: Perf issue with Beam on spark (spark runner) > > > > Thanks @Alexey Romanenko <mailto:aromanenko@gmail.com> please see my > clarifications below. > > > > > > | “Well, of course, if you read all fields (columns) then you don’t need > column projection. Otherwise, it can give a quite significant performance > boost, especially for large tables wi
Re: Trying to run Beam on Spark cluster
Thanks Kyle, On Fri, Sep 24, 2021 at 1:48 PM Kyle Weaver wrote: > Hi Mark. Looks like a problem with artifact staging. PortableRunner > implicitly requires a directory (configurable with --artifacts_dir, under > /tmp by default) that is accessible by both the job server and Beam worker. > Hmmm, I guess I could create an NFS share between the machines and use that for the artifacts_dir. But if I use enironment_type=DOCKER, the docker image won't have access to that. Is there some easy way to modify the docker command that the worker runs when it stars the docker image to map this directory (via '-v') into the docker image? > You should be able to get around this by using --runner SparkRunner > instead: > > python -m apache_beam.examples.wordcount > gs://datapipeline-output/shakespeare-alls-11.txt --output > gs://datapipeline-output/output/ --project august-ascent-325423 > --environment_type=DOCKER *--runner SparkRunner > --spark_rest_url http://hostname:6066 <http://hostname:6066>* > > This requires you to enable REST on your Spark master by putting > `spark.master.rest.enabled` in your config, and then setting the Beam > pipeline option --spark_rest_url to use its address (6066 is the default > port). > I'll try that next while waiting for the answer above. Thanks Mark > > This starts the job server for you, so you don't need to do that ahead of > time. > > Best, > Kyle > > On Sun, Sep 19, 2021 at 12:22 PM Mark Striebeck > wrote: > >> Hi, >> >> I am trying to run beam on a small spark cluster. I setup spark (master >> plus one slave). I am using the portable runner and invoke the beam >> pipeline with: >> >> python -m apache_beam.examples.wordcount >> gs://datapipeline-output/shakespeare-alls-11.txt --output >> gs://datapipeline-output/output/ --project august-ascent-325423 --runner >> PortableRunner --job_endpoint=localhost:8099 --environment_type=DOCKER >> >> I always get an error: >> Caused by: java.util.concurrent.TimeoutException: Timed out while waiting >> for command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null >> apache/beam_python3.8_sdk:2.32.0 --id=4-1 >> --provision_endpoint=localhost:46757' >> >> It takes ~2.5 minutes to pull the beam image which should be enough. But >> I pulled the image manually (docker pull apache/beam_python3.8_sdk:2.32.0) >> and then tried to run the pipeline again. >> >> Now, when I run the pipeline I get an error: >> java.io.FileNotFoundException: >> /tmp/beam-artifact-staging/60321f712323c195764ab31b3e205b228a405fbb80b50fafa67b38b21959c63f/1-ref_Environment_default_e-pickled_main_session >> (No such file or directory) >> >> and then further down >> >> ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: >> java.lang.IllegalStateException: No container running for id >> 7014a9ea98dc0b3f453a9d3860aff43ba42214195d2240d7cefcefcfabf93879 >> >> (here is the full strack trace: >> https://drive.google.com/file/d/1mRzt8G7I9Akkya48KfAbrqPp8wRCzXDe/view) >> >> Any pointer or idea is appreciated (sorry, if this is something obvious - >> I'm still pretty new to beam/spark). >> >> Thanks >> Mark >> >
Trying to run Beam on Spark cluster
Hi, I am trying to run beam on a small spark cluster. I setup spark (master plus one slave). I am using the portable runner and invoke the beam pipeline with: python -m apache_beam.examples.wordcount gs://datapipeline-output/shakespeare-alls-11.txt --output gs://datapipeline-output/output/ --project august-ascent-325423 --runner PortableRunner --job_endpoint=localhost:8099 --environment_type=DOCKER I always get an error: Caused by: java.util.concurrent.TimeoutException: Timed out while waiting for command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null apache/beam_python3.8_sdk:2.32.0 --id=4-1 --provision_endpoint=localhost:46757' It takes ~2.5 minutes to pull the beam image which should be enough. But I pulled the image manually (docker pull apache/beam_python3.8_sdk:2.32.0) and then tried to run the pipeline again. Now, when I run the pipeline I get an error: java.io.FileNotFoundException: /tmp/beam-artifact-staging/60321f712323c195764ab31b3e205b228a405fbb80b50fafa67b38b21959c63f/1-ref_Environment_default_e-pickled_main_session (No such file or directory) and then further down ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: No container running for id 7014a9ea98dc0b3f453a9d3860aff43ba42214195d2240d7cefcefcfabf93879 (here is the full strack trace: https://drive.google.com/file/d/1mRzt8G7I9Akkya48KfAbrqPp8wRCzXDe/view) Any pointer or idea is appreciated (sorry, if this is something obvious - I'm still pretty new to beam/spark). Thanks Mark
Re: java.io.InvalidClassException with Spark 3.1.2
Sure. I starred your repository. On Sat, Aug 21, 2021 at 11:27 AM cw wrote: > Hello Yu, >i done lot of testing, it only work for spark 2+, not 3. if you need a > working example on kubernetes, > https://github.com/cometta/python-apache-beam-spark , feel free to > improve the code, if you would like to contribute. help me *star if if it > is useful for you. thank you > > On Monday, August 16, 2021, 12:37:46 AM GMT+8, Yu Watanabe < > yu.w.ten...@gmail.com> wrote: > > > Hello . > > I would like to ask question for spark runner. > > Using spark downloaded from below link, > > > https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz > > I get below error when submitting a pipeline. > Full error is on > https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693. > > > -- > 21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection > from /192.168.11.2:35601 > java.io.InvalidClassException: > scala.collection.mutable.WrappedArray$ofRef; local class incompatible: > stream classdesc serialVersionUID = 3456489343829468865, local class > serialVersionUID = 1028182004549731694 > at > java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689) > ... > > -- > > SDK Harness and Job service are deployed as below. > > 1. SDK Harness > > sudo docker run --net=host apache/beam_spark3_job_server:2.31.0 > --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true > > 2. Job service > > sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool > > * apache/beam_spark_job_server:2.31.0 for spark 2.4.8 > > 3. SDK client code > > https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2 > > Spark 2.4.8 succeeded without any errors using above components. > > > https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz > > Would there be any setting which you need to be aware of for spark 3.1.2 ? > > Thanks, > Yu Watanabe > > -- > Yu Watanabe > > linkedin: www.linkedin.com/in/yuwatanabe1/ > twitter: twitter.com/yuwtennis > > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: java.io.InvalidClassException with Spark 3.1.2
Hello Yu, i done lot of testing, it only work for spark 2+, not 3. if you need a working example on kubernetes, https://github.com/cometta/python-apache-beam-spark , feel free to improve the code, if you would like to contribute. help me *star if if it is useful for you. thank you On Monday, August 16, 2021, 12:37:46 AM GMT+8, Yu Watanabe wrote: Hello . I would like to ask question for spark runner. Using spark downloaded from below link, https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz I get below error when submitting a pipeline. Full error is on https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693. --21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection from /192.168.11.2:35601 java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef; local class incompatible: stream classdesc serialVersionUID = 3456489343829468865, local class serialVersionUID = 1028182004549731694 at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689) ...-- SDK Harness and Job service are deployed as below. 1. SDK Harness sudo docker run --net=host apache/beam_spark3_job_server:2.31.0 --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true 2. Job service sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool * apache/beam_spark_job_server:2.31.0 for spark 2.4.8 3. SDK client code https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2 Spark 2.4.8 succeeded without any errors using above components. https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz Would there be any setting which you need to be aware of for spark 3.1.2 ? Thanks,Yu Watanabe -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: java.io.InvalidClassException with Spark 3.1.2
Kyle. Thank you. On Tue, Aug 17, 2021 at 5:55 AM Kyle Weaver wrote: > I was able to reproduce the error. I'm not sure why this would happen, > since as far as I can tell the Beam 2.31.0 Spark runner should be using > Spark 3.1.2 and Scala 2.12 [1]. I filed a JIRA issue for it. [2] > > [1] > https://github.com/apache/beam/pull/14897/commits/b6fca2bb79d9e7a69044b477460445456720ec58 > [2] https://issues.apache.org/jira/browse/BEAM-12762 > > > On Sun, Aug 15, 2021 at 9:37 AM Yu Watanabe wrote: > >> Hello . >> >> I would like to ask question for spark runner. >> >> Using spark downloaded from below link, >> >> >> https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz >> >> I get below error when submitting a pipeline. >> Full error is on >> https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693. >> >> >> -- >> 21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection >> from /192.168.11.2:35601 >> java.io.InvalidClassException: >> scala.collection.mutable.WrappedArray$ofRef; local class incompatible: >> stream classdesc serialVersionUID = 3456489343829468865, local class >> serialVersionUID = 1028182004549731694 >> at >> java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689) >> ... >> >> ------ >> >> SDK Harness and Job service are deployed as below. >> >> 1. SDK Harness >> >> sudo docker run --net=host apache/beam_spark3_job_server:2.31.0 >> --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true >> >> 2. Job service >> >> sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool >> >> * apache/beam_spark_job_server:2.31.0 for spark 2.4.8 >> >> 3. SDK client code >> >> https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2 >> >> Spark 2.4.8 succeeded without any errors using above components. >> >> >> https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz >> >> Would there be any setting which you need to be aware of for spark 3.1.2 ? >> >> Thanks, >> Yu Watanabe >> >> -- >> Yu Watanabe >> >> linkedin: www.linkedin.com/in/yuwatanabe1/ >> twitter: twitter.com/yuwtennis >> >> > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
java.io.InvalidClassException with Spark 3.1.2
Hello . I would like to ask question for spark runner. Using spark downloaded from below link, https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz I get below error when submitting a pipeline. Full error is on https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693. -- 21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection from /192.168.11.2:35601 java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef; local class incompatible: stream classdesc serialVersionUID = 3456489343829468865, local class serialVersionUID = 1028182004549731694 at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689) ... -- SDK Harness and Job service are deployed as below. 1. SDK Harness sudo docker run --net=host apache/beam_spark3_job_server:2.31.0 --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true 2. Job service sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool * apache/beam_spark_job_server:2.31.0 for spark 2.4.8 3. SDK client code https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2 Spark 2.4.8 succeeded without any errors using above components. https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz Would there be any setting which you need to be aware of for spark 3.1.2 ? Thanks, Yu Watanabe -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
example on python + spark + k8s
Hello I created an example of running python code on Apache Beam + Spark + Kubernetes at https://github.com/cometta/python-apache-beam-spark because I unable to find any step-by-step guide on how to do this for the past few weeks. I hope it will be useful for other that also looking for similar implementation. Have a good day. Contributors are welcome to further improve the code. Thank you
Re: Submit Python Beam on Spark Dataproc
Hello Mahan. Sorry for the late reply. > Still waiting for startup of environment from localhost:5 for worker id 1-1 >From the message , it seems that something is wrong with connection between Worker node in spark cluster and SDK harness. According to this slide runner worker (in your context spark worker) , should also have connectivity with sdk harness container. https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_1_0 Could you please also try setting ssh tunneling to spark worker node as well ? Thanks, Yu On Thu, Aug 12, 2021 at 9:07 PM Mahan Hosseinzadeh wrote: > Thanks Yu for the help and the tips. > > I ran the following steps but my job is stuck and can't get submitted to > Dataproc and I keep getting this message in job-server: > Still waiting for startup of environment from localhost:5 for worker > id 1-1 > > > - > *Beam code:* > pipeline_options = PipelineOptions([ > "--runner=PortableRunner", > "--job_endpoint=localhost:8099", > "--environment_type=EXTERNAL", > "--environment_config=localhost:5" > ]) > > - > *Job Server:* > I couldn't use Docker because host networking doesn't work on Mac OS and I > used Gradle instead > > ./gradlew :runners:spark:3:job-server:runShadow > > - > *Beam Worker Pool:* > docker run -p=5:5 apache/beam_python3.7_sdk --worker_pool > > - > *SSH tunnel to the master node:* > gcloud compute ssh \ > --project \ > --zone \ > -- -NL 7077:localhost:7077 > > - > > Thanks, > Mahan > > On Tue, Aug 10, 2021 at 3:53 PM Yu Watanabe wrote: > >> Hello . >> >> Would this page help ? I hope it helps. >> >> https://beam.apache.org/documentation/runners/spark/ >> >> > Running on a pre-deployed Spark cluster >> >> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is >> 7077 the master url port? >> * Yes. >> >> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? >> * Job server should be able to communicate with Spark master node port >> 7077. So I believe it is Yes. >> >> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK >> Harness Configuration? >> * This is the configuration of how you want your harness container to >> spin up. >> >> https://beam.apache.org/documentation/runtime/sdk-harness-config/ >> >> For DOCKER , you will need docker deployed on all spark worker nodes. >> > User code is executed within a container started on each worker node >> >> I used EXTERNAL when I did it with flink cluster before. >> >> e.g >> >> https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14 >> >> 4- Should we run the job-server outside of the Dataproc cluster or should >> we run it in the master node? >> * Depends. It could be inside or outside the master node. But if you are >> connecting to full managed service, then outside might be better. >> >> https://beam.apache.org/documentation/runners/spark/ >> >> > Start JobService that will connect with the Spark master >> >> Thanks, >> Yu >> >> On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh >> wrote: >> >>> Hi, >>> >>> I have a Python Beam job that works on Dataflow but we would like to >>> submit it on a Spark Dataproc cluster with no Flink involvement. >>> I already spent days but failed to figure out how to use PortableRunner >>> with the beam_spark_job_server to submit my Python Beam job to Spark >>> Dataproc. All the Beam docs are about Flink and there is no guideline about >>> Spark with Dataproc. >>> Some relevant questions might be: >>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is >>> 7077 the master url port? >>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? >>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK >>> Harness Configuration? >>> 4- Should we run the job-server outside of the Dataproc cluster or >>> should we run it in the master node? >>> >>> Thanks, >>> Mahan >>> >> >> >> -- >> Yu Watanabe >> >> linkedin: www.linkedin.com/in/yuwatanabe1/ >> twitter: twitter.com/yuwtennis >> >> > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Re: Submit Python Beam on Spark Dataproc
Thanks Yu for the help and the tips. I ran the following steps but my job is stuck and can't get submitted to Dataproc and I keep getting this message in job-server: Still waiting for startup of environment from localhost:5 for worker id 1-1 - *Beam code:* pipeline_options = PipelineOptions([ "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=EXTERNAL", "--environment_config=localhost:5" ]) - *Job Server:* I couldn't use Docker because host networking doesn't work on Mac OS and I used Gradle instead ./gradlew :runners:spark:3:job-server:runShadow - *Beam Worker Pool:* docker run -p=5:5 apache/beam_python3.7_sdk --worker_pool - *SSH tunnel to the master node:* gcloud compute ssh \ --project \ --zone \ -- -NL 7077:localhost:7077 - Thanks, Mahan On Tue, Aug 10, 2021 at 3:53 PM Yu Watanabe wrote: > Hello . > > Would this page help ? I hope it helps. > > https://beam.apache.org/documentation/runners/spark/ > > > Running on a pre-deployed Spark cluster > > 1- What's spark-master-url in case of a remote cluster on Dataproc? Is > 7077 the master url port? > * Yes. > > 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? > * Job server should be able to communicate with Spark master node port > 7077. So I believe it is Yes. > > 3- What's the environment_type? Can we use DOCKER? Then what's the SDK > Harness Configuration? > * This is the configuration of how you want your harness container to > spin up. > > https://beam.apache.org/documentation/runtime/sdk-harness-config/ > > For DOCKER , you will need docker deployed on all spark worker nodes. > > User code is executed within a container started on each worker node > > I used EXTERNAL when I did it with flink cluster before. > > e.g > > https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14 > > 4- Should we run the job-server outside of the Dataproc cluster or should > we run it in the master node? > * Depends. It could be inside or outside the master node. But if you are > connecting to full managed service, then outside might be better. > > https://beam.apache.org/documentation/runners/spark/ > > > Start JobService that will connect with the Spark master > > Thanks, > Yu > > On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh > wrote: > >> Hi, >> >> I have a Python Beam job that works on Dataflow but we would like to >> submit it on a Spark Dataproc cluster with no Flink involvement. >> I already spent days but failed to figure out how to use PortableRunner >> with the beam_spark_job_server to submit my Python Beam job to Spark >> Dataproc. All the Beam docs are about Flink and there is no guideline about >> Spark with Dataproc. >> Some relevant questions might be: >> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is >> 7077 the master url port? >> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? >> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK >> Harness Configuration? >> 4- Should we run the job-server outside of the Dataproc cluster or should >> we run it in the master node? >> >> Thanks, >> Mahan >> > > > -- > Yu Watanabe > > linkedin: www.linkedin.com/in/yuwatanabe1/ > twitter: twitter.com/yuwtennis > >
Re: Submit Python Beam on Spark Dataproc
Hello . Would this page help ? I hope it helps. https://beam.apache.org/documentation/runners/spark/ > Running on a pre-deployed Spark cluster 1- What's spark-master-url in case of a remote cluster on Dataproc? Is 7077 the master url port? * Yes. 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? * Job server should be able to communicate with Spark master node port 7077. So I believe it is Yes. 3- What's the environment_type? Can we use DOCKER? Then what's the SDK Harness Configuration? * This is the configuration of how you want your harness container to spin up. https://beam.apache.org/documentation/runtime/sdk-harness-config/ For DOCKER , you will need docker deployed on all spark worker nodes. > User code is executed within a container started on each worker node I used EXTERNAL when I did it with flink cluster before. e.g https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14 4- Should we run the job-server outside of the Dataproc cluster or should we run it in the master node? * Depends. It could be inside or outside the master node. But if you are connecting to full managed service, then outside might be better. https://beam.apache.org/documentation/runners/spark/ > Start JobService that will connect with the Spark master Thanks, Yu On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh wrote: > Hi, > > I have a Python Beam job that works on Dataflow but we would like to > submit it on a Spark Dataproc cluster with no Flink involvement. > I already spent days but failed to figure out how to use PortableRunner > with the beam_spark_job_server to submit my Python Beam job to Spark > Dataproc. All the Beam docs are about Flink and there is no guideline about > Spark with Dataproc. > Some relevant questions might be: > 1- What's spark-master-url in case of a remote cluster on Dataproc? Is > 7077 the master url port? > 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? > 3- What's the environment_type? Can we use DOCKER? Then what's the SDK > Harness Configuration? > 4- Should we run the job-server outside of the Dataproc cluster or should > we run it in the master node? > > Thanks, > Mahan > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis
Submit Python Beam on Spark Dataproc
Hi, I have a Python Beam job that works on Dataflow but we would like to submit it on a Spark Dataproc cluster with no Flink involvement. I already spent days but failed to figure out how to use PortableRunner with the beam_spark_job_server to submit my Python Beam job to Spark Dataproc. All the Beam docs are about Flink and there is no guideline about Spark with Dataproc. Some relevant questions might be: 1- What's spark-master-url in case of a remote cluster on Dataproc? Is 7077 the master url port? 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh? 3- What's the environment_type? Can we use DOCKER? Then what's the SDK Harness Configuration? 4- Should we run the job-server outside of the Dataproc cluster or should we run it in the master node? Thanks, Mahan
Re: Perf issue with Beam on spark (spark runner)
> On 5 Aug 2021, at 18:17, Tao Li wrote: > > It was a great presentation! Thanks! > Regarding my perf testing, I was not doing aggregation, filtering, > projection or joining. I was simply reading all the fields of parquet and > then immediately save PCollection back to parquet. Well, of course, if you read all fields (columns) then you don’t need column projection. Otherwise, it can give a quite significant performance boost, especially for large tables with many columns. > Regarding SDF translation, is it enabled by default? From Beam 2.30.0 release notes: "Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based Read transforms ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670))” — Alexey > I will check out ParquetIO splittable. Thanks! > > From: Alexey Romanenko > Date: Thursday, August 5, 2021 at 6:40 AM > To: Tao Li > Cc: "user@beam.apache.org" , Andrew Pilloud > , Ismaël Mejía , Kyle Weaver > , Yuchu Cao > Subject: Re: Perf issue with Beam on spark (spark runner) > > It’s very likely that Spark SQL may have much better performance because of > SQL push-downs and avoiding additional ser/deser operations. > > In the same time, did you try to leverage "withProjection()” in ParquetIO and > project only the fields that you needed? > > Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])? > > Also, using SDF translation for Read on Spark Runner can cause performance > degradation as well (we noticed that in our experiments). Try to use non-SDF > read (if not yet) [2] > > > PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m > not sure if a recording is already available but you can find the slides here > [3] that can be helpful. > > > — > Alexey > > [1] https://issues.apache.org/jira/browse/BEAM-12070 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=Yq%2FODFNPo7XncHKExNDRBw6qRH2HSrymTcSGGRRWICs%3D=0> > [2] https://issues.apache.org/jira/browse/BEAM-10670 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=ABQA4rB%2BeiMHIGdXQKiADS93F9%2F3bUfn4%2BCRRr4dgVI%3D=0> > [3] > https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing > > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=%2Fj0Qeibje5jk0Hiz9x57Pa92mRTyzvmTf63hOrNCPZ4%3D=0> > > > >> On 5 Aug 2021, at 03:07, Tao Li mailto:t...@zillow.com>> >> wrote: >> >> @Alexey Romanenko <mailto:aromanenko@gmail.com> @Ismaël Mejía >> <mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can you >> please take a look at this thread and confirm this jira covers the causes >> https://issues.apache.org/jira/browse/BEAM-12646 >> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=c23T9dKc0muC7sRWrsAYrewA4QKAUSc6tOAwe9kRfC4%3D=0> >> ? >> >> This perf issue is currently a blocker to me.. >> >> Thanks so much! >> >> From: Tao Li mailto:t...@zillow.com>> >> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" >> mailto:user@beam.apache.org>> >> Date: Friday, July 30, 2021 at 3:53 PM >> To: Andrew Pilloud mailto:apill...@google.com>>, >> "user@beam.apache.org <mailto:user@beam.apache.org>" > <mailto:user@beam.apache.org>> >> Cc: Kyle Weaver mailto:kcwea...@google.com>>, Yuchu >> Cao mailto:yuc...@trulia.com>
Re: Spark Structured Streaming runner migrated to Spark 3
Hooray! Thanks, Etienne! On Thu, Aug 5, 2021 at 3:11 AM Etienne Chauchot wrote: > Hi all, > > Just to let you know that Spark Structured Streaming runner was migrated > to Spark 3. > > Enjoy ! > > Etienne > >
Re: Perf issue with Beam on spark (spark runner)
Hi Alexey, It was a great presentation! Regarding my perf testing, I was not doing aggregation, filtering, projection or joining. I was simply reading all the fields of parquet and then immediately save PCollection back to parquet. Regarding SDF translation, is it enabled by default? I will check out ParquetIO splittable. Thanks! From: Alexey Romanenko Date: Thursday, August 5, 2021 at 6:40 AM To: Tao Li Cc: "user@beam.apache.org" , Andrew Pilloud , Ismaël Mejía , Kyle Weaver , Yuchu Cao Subject: Re: Perf issue with Beam on spark (spark runner) It’s very likely that Spark SQL may have much better performance because of SQL push-downs and avoiding additional ser/deser operations. In the same time, did you try to leverage "withProjection()” in ParquetIO and project only the fields that you needed? Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])? Also, using SDF translation for Read on Spark Runner can cause performance degradation as well (we noticed that in our experiments). Try to use non-SDF read (if not yet) [2] PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m not sure if a recording is already available but you can find the slides here [3] that can be helpful. — Alexey [1] https://issues.apache.org/jira/browse/BEAM-12070<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=Yq%2FODFNPo7XncHKExNDRBw6qRH2HSrymTcSGGRRWICs%3D=0> [2] https://issues.apache.org/jira/browse/BEAM-10670<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=ABQA4rB%2BeiMHIGdXQKiADS93F9%2F3bUfn4%2BCRRr4dgVI%3D=0> [3] https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=%2Fj0Qeibje5jk0Hiz9x57Pa92mRTyzvmTf63hOrNCPZ4%3D=0> On 5 Aug 2021, at 03:07, Tao Li mailto:t...@zillow.com>> wrote: @Alexey Romanenko<mailto:aromanenko@gmail.com> @Ismaël Mejía<mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can you please take a look at this thread and confirm this jira covers the causes https://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=c23T9dKc0muC7sRWrsAYrewA4QKAUSc6tOAwe9kRfC4%3D=0> ? This perf issue is currently a blocker to me.. Thanks so much! From: Tao Li mailto:t...@zillow.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" mailto:user@beam.apache.org>> Date: Friday, July 30, 2021 at 3:53 PM To: Andrew Pilloud mailto:apill...@google.com>>, "user@beam.apache.org<mailto:user@beam.apache.org>" mailto:user@beam.apache.org>> Cc: Kyle Weaver mailto:kcwea...@google.com>>, Yuchu Cao mailto:yuc...@trulia.com>> Subject: Re: Perf issue with Beam on spark (spark runner) Thanks everyone for your help. We actually did another round of perf comparison between Beam (on spark) and native spark, without any projection/filtering in the query (to rule out the “predicate pushdown” factor). The time spent on Beam with spark runner is still taking 3-5x period of time compared with native spark, and the cause ishttps://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001702736%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=LXb2NFUuF3BKkUX6m6rAdMJ%2B04e8WjxPNcDVn4zibl8%3D=0> according to the spark metrics. Spark runner is pretty much the bottleneck. From: Andrew Pilloud mailto:apill...@google.com>> Date: Thursday, July 29,
Re: Perf issue with Beam on spark (spark runner)
It’s very likely that Spark SQL may have much better performance because of SQL push-downs and avoiding additional ser/deser operations. In the same time, did you try to leverage "withProjection()” in ParquetIO and project only the fields that you needed? Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])? Also, using SDF translation for Read on Spark Runner can cause performance degradation as well (we noticed that in our experiments). Try to use non-SDF read (if not yet) [2] PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m not sure if a recording is already available but you can find the slides here [3] that can be helpful. — Alexey [1] https://issues.apache.org/jira/browse/BEAM-12070 [2] https://issues.apache.org/jira/browse/BEAM-10670 [3] https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing > On 5 Aug 2021, at 03:07, Tao Li wrote: > > @Alexey Romanenko <mailto:aromanenko@gmail.com> @Ismaël Mejía > <mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can you > please take a look at this thread and confirm this jira covers the causes > https://issues.apache.org/jira/browse/BEAM-12646 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081708037%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=UCxzySGVB8H%2B2tOmjDVN5FqeSVxarmD5c1gg3Xa4RKA%3D=0> > ? > > This perf issue is currently a blocker to me.. > > Thanks so much! > > From: Tao Li mailto:t...@zillow.com>> > Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" > mailto:user@beam.apache.org>> > Date: Friday, July 30, 2021 at 3:53 PM > To: Andrew Pilloud mailto:apill...@google.com>>, > "user@beam.apache.org <mailto:user@beam.apache.org>" <mailto:user@beam.apache.org>> > Cc: Kyle Weaver mailto:kcwea...@google.com>>, Yuchu Cao > mailto:yuc...@trulia.com>> > Subject: Re: Perf issue with Beam on spark (spark runner) > > Thanks everyone for your help. > > We actually did another round of perf comparison between Beam (on spark) and > native spark, without any projection/filtering in the query (to rule out the > “predicate pushdown” factor). > > The time spent on Beam with spark runner is still taking 3-5x period of time > compared with native spark, and the cause > ishttps://issues.apache.org/jira/browse/BEAM-12646 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081708037%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=UCxzySGVB8H%2B2tOmjDVN5FqeSVxarmD5c1gg3Xa4RKA%3D=0> > according to the spark metrics. Spark runner is pretty much the bottleneck. > > > > From: Andrew Pilloud mailto:apill...@google.com>> > Date: Thursday, July 29, 2021 at 2:11 PM > To: "user@beam.apache.org <mailto:user@beam.apache.org>" > mailto:user@beam.apache.org>> > Cc: Tao Li mailto:t...@zillow.com>>, Kyle Weaver > mailto:kcwea...@google.com>>, Yuchu Cao > mailto:yuc...@trulia.com>> > Subject: Re: Perf issue with Beam on spark (spark runner) > > Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0. > > Andrew > > On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud <mailto:apill...@google.com>> wrote: >> Beam SQL doesn't currently have project pushdown for ParquetIO (we are >> working to expand this to more IOs). Using ParquetIO withProjection directly >> will produce better results. >> >> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw > <mailto:rober...@google.com>> wrote: >>> Could you try using Beam SQL [1] and see if that gives more similar result >>> to your Spark SQL query? I would also be curious if the performance is >>> sufficient using withProjection to only read the auction, price, and bidder >>> columns. >>> >>> [1] https://beam.apache.org/documentation/dsls/sql/overview/ >>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081698082%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C10
Spark Structured Streaming runner migrated to Spark 3
Hi all, Just to let you know that Spark Structured Streaming runner was migrated to Spark 3. Enjoy ! Etienne
Re: Spark Structured Streaming Runner Roadmap
Hi, Sorry for the late answer: the streaming mode in spark structured streaming runner is stuck because of spark structured streaming framework implementation of watermark at the apache spark project side. See https://echauchot.blogspot.com/2020/11/watermark-architecture-proposal-for.html best Etienne On 20/05/2021 20:37, Yu Zhang wrote: Hi Beam Community, Would there be any roadmap for Spark Structured Runner to support streaming and Splittable DoFn API? Like the specific timeline or release version. Thanks, Yu
example on running python apache beam with standalone spark
I start the Spark docker with this commands on the host machine curl -LO https://raw.githubusercontent.com/bitnami/bitnami-docker-spark/master/docker-compose.yml # edit and expose port 7077 to host machine $ docker-compose up then, i run docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:7077 on the host machine lastly, i run this command on the host machine python -m apache_beam.examples.wordcount --input ./a_file_input \ --output ./counts \ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK I get below errors from the beam_spark_job_server docker container 21/07/28 13:41:56 INFO org.apache.beam.runners.spark.SparkJobInvoker: Invoking job BeamApp-root-0728134156-a85a64c5_019bd04e-aabc-4deb-8b12-8c61f173cd5b 21/07/28 13:41:56 INFO org.apache.beam.runners.jobsubmission.JobInvocation: Starting job invocation BeamApp-root-0728134156-a85a64c5_019bd04e-aabc-4deb-8b12-8c61f173cd5b 21/07/28 13:41:56 INFO org.apache.beam.runners.core.construction.resources.PipelineResources: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 6 files. Enable logging at DEBUG level to see which files will be staged. 21/07/28 13:41:57 INFO org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a brand new Spark Context. 21/07/28 13:41:57 WARN org.apache.spark.util.Utils: Your hostname, cometstrike resolves to a loopback address: 127.0.1.1; using 192.168. instead (on interface wlo1) 21/07/28 13:41:57 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 21/07/28 13:41:57 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 21/07/28 13:42:57 ERROR org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 21/07/28 13:42:57 WARN org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application ID is not initialized yet. 21/07/28 13:42:57 WARN org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master 21/07/28 13:42:57 ERROR org.apache.spark.SparkContext: Error initializing SparkContext. java.lang.NullPointerException at org.apache.spark.SparkContext.(SparkContext.scala:560) at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58) at org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:101) at org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:67) at org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:118) at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 21/07/28 13:42:57 ERROR org.apache.spark.scheduler.AsyncEventQueue: Listener AppStatusListener threw an exception java.lang.NullPointerException at org.apache.spark.status.AppStatusListener.onApplicationEnd(AppStatusListener.scala:167) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:57) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92) at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92) at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87) at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58
Re: spark-submit and the portable runner
Thanks. That was the issue. The latest release of Beam indicates it supports Spark 3 and I find references in the code to Hadoop 3.2.1. Is it possible to configure beam to run on Hadoop 3.2.1? Trevor On Mon, Jun 21, 2021 at 6:19 PM Kyle Weaver wrote: > Looks like a version mismatch between Hadoop dependencies [1]. Beam's > Python wrapper for Spark is currently pinned to Spark 2.4.8 [2]. Which > Spark and Hadoop versions are your cluster using? > > [1] > https://stackoverflow.com/questions/62880009/error-through-remote-spark-job-java-lang-illegalaccesserror-class-org-apache-h > [2] https://issues.apache.org/jira/browse/BEAM-12094 > > On Mon, Jun 21, 2021 at 9:52 AM Trevor Kramer > wrote: > >> Does anyone have an example of using spark-submit to run a beam job using >> the portable runner? The documentation indicates this is possible but >> doesn't give an example of how to do it. >> >> I am using the following pipeline options to generate the jar >> >> options = PipelineOptions(['--runner=SparkRunner', >>'--environment_type=DOCKER', >>'--environment_config=path-to-image:latest', >>'--output_executable_path=output.jar' >>]) >> >> and am trying to run it with >> >> spark-submit --master yarn --deploy-mode cluster --class >> org.apache.beam.runners.spark.SparkPipelineRunner output.jar >> >> >> but I get the following error >> >> >> Exception in thread "main" java.lang.IllegalAccessError: class >> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface >> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator >> >> at java.lang.ClassLoader.defineClass1(Native Method) >> >> at java.lang.ClassLoader.defineClass(ClassLoader.java:757) >> >> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) >> >> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) >> >> at java.net.URLClassLoader.access$100(URLClassLoader.java:74) >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:369) >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:363) >> >> at java.security.AccessController.doPrivileged(Native Method) >> >> at java.net.URLClassLoader.findClass(URLClassLoader.java:362) >> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:419) >> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:352) >> >> at java.lang.Class.forName0(Native Method) >> >> at java.lang.Class.forName(Class.java:348) >> >> at >> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) >> >> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) >> >> at java.util.ServiceLoader$1.next(ServiceLoader.java:480) >> >> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268) >> >> at >> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313) >> >> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352) >> >> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123) >> >> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403) >> >> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371) >> >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482) >> >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230) >> >> at >> org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138) >> >> at scala.Option.getOrElse(Option.scala:189) >> >> at org.apache.spark.deploy.yarn.Client.(Client.scala:138) >> >> at >> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526) >> >> at org.apache.spark.deploy.SparkSubmit.org >> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) >> >> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) >> >> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) >> >> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) >> >> at >> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) >> >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) >> >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >>
spark-submit and the portable runner
Does anyone have an example of using spark-submit to run a beam job using the portable runner? The documentation indicates this is possible but doesn't give an example of how to do it. I am using the following pipeline options to generate the jar options = PipelineOptions(['--runner=SparkRunner', '--environment_type=DOCKER', '--environment_config=path-to-image:latest', '--output_executable_path=output.jar' ]) and am trying to run it with spark-submit --master yarn --deploy-mode cluster --class org.apache.beam.runners.spark.SparkPipelineRunner output.jar but I get the following error Exception in thread "main" java.lang.IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:757) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230) at org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.deploy.yarn.Client.(Client.scala:138) at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526) at org.apache.spark.deploy.SparkSubmit.org $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Re: How to specify a spark config with Beam spark runner
Hi Alexey, Thanks we will give it a try. From: Alexey Romanenko Reply-To: "user@beam.apache.org" Date: Thursday, June 10, 2021 at 5:14 AM To: "user@beam.apache.org" Subject: Re: How to specify a spark config with Beam spark runner Hi Tao, "Limited spark options”, that you mentioned, are Beam's application arguments and if you run your job via "spark-submit" you should still be able to configure Spark application via normal spark-submit “--conf key=value” CLI option. Doesn’t it work for you? — Alexey On 10 Jun 2021, at 01:29, Tao Li mailto:t...@zillow.com>> wrote: Hi Beam community, We are trying to specify a spark config “spark.hadoop.fs.s3a.canned.acl=BucketOwnerFullControl” in the spark-submit command for a beam app. I only see limited spark options supported according to this doc: https://beam.apache.org/documentation/runners/spark/<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Frunners%2Fspark%2F=04%7C01%7Ctaol%40zillow.com%7Ca97c13493d904a366c0308d92c094b53%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637589240693166504%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=l%2BWgkIYG%2BVM9j7z0PXMKj1ybNL51E%2F2%2BmTUgD4dkeuc%3D=0> How can we specify an arbitrary spark config? Please advise. Thanks!
Re: How to specify a spark config with Beam spark runner
Hi Tao, "Limited spark options”, that you mentioned, are Beam's application arguments and if you run your job via "spark-submit" you should still be able to configure Spark application via normal spark-submit “--conf key=value” CLI option. Doesn’t it work for you? — Alexey > On 10 Jun 2021, at 01:29, Tao Li wrote: > > Hi Beam community, > > We are trying to specify a spark config > “spark.hadoop.fs.s3a.canned.acl=BucketOwnerFullControl” in the spark-submit > command for a beam app. I only see limited spark options supported according > to this doc: https://beam.apache.org/documentation/runners/spark/ > <https://beam.apache.org/documentation/runners/spark/> > > How can we specify an arbitrary spark config? Please advise. Thanks!
How to specify a spark config with Beam spark runner
Hi Beam community, We are trying to specify a spark config “spark.hadoop.fs.s3a.canned.acl=BucketOwnerFullControl” in the spark-submit command for a beam app. I only see limited spark options supported according to this doc: https://beam.apache.org/documentation/runners/spark/ How can we specify an arbitrary spark config? Please advise. Thanks!
Re: JdbcIO parallel read on spark
Quick update. After some testing, we have noticed that the splittable JdbcIO-poc works well when the number of splits does not exceed the number of spark tasks. In cases where the number of splits do exceed the task count, the pipeline freezes after each worker has processed a single split each. In our case, we are attempting to read 895676 rows, with 1 row splits. We are running this on a single Spark worker for testing purposes, and the stage is split into 8 tasks. The pipeline freezes after 8 splits have been processed. We are able to verify that the @ProcessElements-function returns without exceptions. On the direct-runner, we noticed that setting the split size to less than the checkpoint-size (hardcoded to 100) mitigated this problem. Any higher value will cause the pipeline to freeze just like with the SparkRunner. Is there anyone who can help shine a light on this? On Wed, May 26, 2021 at 7:50 AM Thomas Fredriksen(External) < thomas.fredrik...@cognite.com> wrote: > There is no forking after the "Generate Queries" transform. > > We noticed that the "Generate Queries" transform is in a different stage > than the reading itself. This is likely due to the Reparallelize-transform, > and we also see this with JdbcIO.readAll. > > After reading up on Splittable DoFn's, we decided to give it a try. We > essentially copied the source of JdbcIO into our project and changed > `ReadFn` into `SplittableJdbcIO` which acts as (more or less) a drop-in > replacement (see source below). > > The DAG here is seemingly simpler with a single stage containing all steps > from reading the DB to writing. We are also seeing that the job is > parallelizing much better than before. > > class SplittableJdbcIO { >> /* ... */ >> @ProcessElement >> public void processElement(@Element ParameterT element, >>RestrictionTracker >> tracker, >>OutputReceiver out) throws >> Exception { >> if (connection == null) { >> connection = dataSource.getConnection(); >> } >> >> if >> (!tracker.tryClaim(tracker.currentRestriction().getFrom())) { >> LOG.error("Failed to claim restriction"); >> ProcessContinuation.stop(); >> } >> >> LOG.info("Preparing query. fetchSize={}, shardSize={}, >> from={}, to={}", fetchSize, shardSize, >> tracker.currentRestriction().getFrom(), >> tracker.currentRestriction().getTo()); >> >> String executeQuery = String.format("SELECT * FROM (%s) t >> OFFSET ? ROWS FETCH NEXT ? ROWS ONLY;", query.get()); >> >> // PostgreSQL requires autocommit to be disabled to enable >> cursor streaming >> // see >> https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor >> LOG.debug("Autocommit has been disabled"); >> connection.setAutoCommit(false); >> try (PreparedStatement statement = >> connection.prepareStatement(executeQuery, ResultSet.TYPE_FORWARD_ONLY, >> ResultSet.CONCUR_READ_ONLY)) { >> statement.setFetchSize(fetchSize); >> parameterSetter.setParameters(element, statement); >> >> >> statement.setLong(statement.getParameterMetaData().getParameterCount() - 1, >> tracker.currentRestriction().getFrom()); >> >> statement.setLong(statement.getParameterMetaData().getParameterCount(), >> tracker.currentRestriction().getTo() - >> tracker.currentRestriction().getFrom()); >> >> queryCounter.inc(); >> >> int count = 0; >> long t0 = Instant.now().getMillis(); >> >> try (ResultSet resultSet = statement.executeQuery()) { >> queryLatency.update(Instant.now().getMillis() - t0); >> LOG.info("Query took {} ms", >> Instant.now().getMillis() - t0); >> >> while (resultSet.next()) { >> out.output(rowMapper.mapRow(resultSet)); >> >> rowCounter.inc(); >> count++; >> } >> LOG.info("Fetched {} rows", count); >> >> } >> } >> } >> >> @SplitRestriction >> public void splitRestriction(@Element ParameterT element, >>
Re: JdbcIO parallel read on spark
try (ResultSet resultSet = statement.executeQuery()) { > resultSet.next(); > > long result = resultSet.getLong(1); > LOG.info("The query results in {} rows", result); > return new OffsetRange(0, result); > } > } > } > } > } > On Tue, May 25, 2021 at 6:35 PM Alexey Romanenko wrote: > Hi, > > Did you check a Spark DAG if it doesn’t fork branches after "Genereate > queries” transform? > > — > Alexey > > On 24 May 2021, at 20:32, Thomas Fredriksen(External) < > thomas.fredrik...@cognite.com> wrote: > > Hi there, > > We are struggling to get the JdbcIO-connector to read a large table on > spark. > > In short - we wish to read a large table (several billion rows), transform > then write the transformed data to a new table. > > We are aware that `JdbcIO.read()` does not parallelize. In order to solve > this, we attempted to create ranges then generate `limit/offset` queries > and use `JdbcIO.readAll()` instead. > > The overall steps look something like this (sanitized for readability): > > ``` > pipeline > .apply("Read row count", JdbcIo.read() > .withQuery("select count(*) from MYTABLE;") > .withCoder(VarLongCoder.of()) > .withOtherOptions(...)) > .apply("Genereate queries", ParDo.of(new DoFn() {...}) // > Outputs table offsets > .apply("Read results", JdbcIO.readAll() > .withCoder(SchemaCoder.of(...)) > .withOutputParallelization(false) > .withQuery("select * from MYTABLE offset ? limit MYLIMIT;") > .withParameterSetter((element, statement) -> statement.setLong(1, > element)) > .withOtherOptions(...)) > .apply("more steps", ...); > ``` > > The problem is that this does not seem to parallelize on the spark runner. > Only a single worker seem to be doing all the work. > > We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`, > however this did not seem to make a difference. > > Our goal is to avoid all data from the query be cached in memory between > the read and transform operations. This causes OOM-exceptions. Having a > single worker reading the database is okay as long as other workers can > process the data as soon as it is read and not having to wait for all the > data to be ready. > > Any advice on how we approach this. > > Best Regards > Thomas Li Fredriksen > > >
Re: JdbcIO parallel read on spark
Hi, Did you check a Spark DAG if it doesn’t fork branches after "Genereate queries” transform? — Alexey > On 24 May 2021, at 20:32, Thomas Fredriksen(External) > wrote: > > Hi there, > > We are struggling to get the JdbcIO-connector to read a large table on spark. > > In short - we wish to read a large table (several billion rows), transform > then write the transformed data to a new table. > > We are aware that `JdbcIO.read()` does not parallelize. In order to solve > this, we attempted to create ranges then generate `limit/offset` queries and > use `JdbcIO.readAll()` instead. > > The overall steps look something like this (sanitized for readability): > > ``` > pipeline > .apply("Read row count", JdbcIo.read() > .withQuery("select count(*) from MYTABLE;") > .withCoder(VarLongCoder.of()) > .withOtherOptions(...)) > .apply("Genereate queries", ParDo.of(new DoFn() {...}) // > Outputs table offsets > .apply("Read results", JdbcIO.readAll() > .withCoder(SchemaCoder.of(...)) > .withOutputParallelization(false) > .withQuery("select * from MYTABLE offset ? limit MYLIMIT;") > .withParameterSetter((element, statement) -> statement.setLong(1, > element)) > .withOtherOptions(...)) > .apply("more steps", ...); > ``` > > The problem is that this does not seem to parallelize on the spark runner. > Only a single worker seem to be doing all the work. > > We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`, > however this did not seem to make a difference. > > Our goal is to avoid all data from the query be cached in memory between the > read and transform operations. This causes OOM-exceptions. Having a single > worker reading the database is okay as long as other workers can process the > data as soon as it is read and not having to wait for all the data to be > ready. > > Any advice on how we approach this. > > Best Regards > Thomas Li Fredriksen
JdbcIO parallel read on spark
Hi there, We are struggling to get the JdbcIO-connector to read a large table on spark. In short - we wish to read a large table (several billion rows), transform then write the transformed data to a new table. We are aware that `JdbcIO.read()` does not parallelize. In order to solve this, we attempted to create ranges then generate `limit/offset` queries and use `JdbcIO.readAll()` instead. The overall steps look something like this (sanitized for readability): ``` pipeline .apply("Read row count", JdbcIo.read() .withQuery("select count(*) from MYTABLE;") .withCoder(VarLongCoder.of()) .withOtherOptions(...)) .apply("Genereate queries", ParDo.of(new DoFn() {...}) // Outputs table offsets .apply("Read results", JdbcIO.readAll() .withCoder(SchemaCoder.of(...)) .withOutputParallelization(false) .withQuery("select * from MYTABLE offset ? limit MYLIMIT;") .withParameterSetter((element, statement) -> statement.setLong(1, element)) .withOtherOptions(...)) .apply("more steps", ...); ``` The problem is that this does not seem to parallelize on the spark runner. Only a single worker seem to be doing all the work. We have tried to break fusion using a variant of `JdbcIO.Reparallelize()`, however this did not seem to make a difference. Our goal is to avoid all data from the query be cached in memory between the read and transform operations. This causes OOM-exceptions. Having a single worker reading the database is okay as long as other workers can process the data as soon as it is read and not having to wait for all the data to be ready. Any advice on how we approach this. Best Regards Thomas Li Fredriksen
Spark Structured Streaming Runner Roadmap
Hi Beam Community, Would there be any roadmap for Spark Structured Runner to support streaming and Splittable DoFn API? Like the specific timeline or release version. Thanks, Yu
Re: Does SnowflakeIO support spark runner
Thanks Kyle! From: Kyle Weaver Date: Thursday, May 6, 2021 at 12:19 PM To: Tao Li Cc: "user@beam.apache.org" , Anuj Gandhi Subject: Re: Does SnowflakeIO support spark runner Yeah, I'm pretty sure that documentation is just misleading. All of the options from --runner onward are runner-specific and don't have anything to do with Snowflake, so they should probably be removed from the doc. On Thu, May 6, 2021 at 12:06 PM Tao Li mailto:t...@zillow.com>> wrote: Hi @Kyle Weaver<mailto:kcwea...@google.com> According to this doc<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fio%2Fbuilt-in%2Fsnowflake%2F=04%7C01%7Ctaol%40zillow.com%7C7be88d03486d4f64c1db08d910c3e67a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637559255833746635%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=gnk80TpLC%2BMlclZ%2F5PKFTFc5H2GoQtL2GUOYZvzRj1g%3D=0>: --runner= From: Kyle Weaver mailto:kcwea...@google.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" mailto:user@beam.apache.org>> Date: Thursday, May 6, 2021 at 12:01 PM To: "user@beam.apache.org<mailto:user@beam.apache.org>" mailto:user@beam.apache.org>> Cc: Anuj Gandhi mailto:an...@zillowgroup.com>> Subject: Re: Does SnowflakeIO support spark runner As far as I know, it should be supported (Beam's abstract model means IOs usually "just work" on all runners). What makes you think it isn't supported? On Thu, May 6, 2021 at 11:52 AM Tao Li mailto:t...@zillow.com>> wrote: Hi Beam community, Does SnowflakeIO support spark runner? Seems like only direct runner and dataflow runner are supported.. Thanks!
Re: Does SnowflakeIO support spark runner
Yeah, I'm pretty sure that documentation is just misleading. All of the options from --runner onward are runner-specific and don't have anything to do with Snowflake, so they should probably be removed from the doc. On Thu, May 6, 2021 at 12:06 PM Tao Li wrote: > Hi @Kyle Weaver > > > > According to this doc > <https://beam.apache.org/documentation/io/built-in/snowflake/>: > --runner= > > > > *From: *Kyle Weaver > *Reply-To: *"user@beam.apache.org" > *Date: *Thursday, May 6, 2021 at 12:01 PM > *To: *"user@beam.apache.org" > *Cc: *Anuj Gandhi > *Subject: *Re: Does SnowflakeIO support spark runner > > > > As far as I know, it should be supported (Beam's abstract model means IOs > usually "just work" on all runners). What makes you think it isn't > supported? > > > > On Thu, May 6, 2021 at 11:52 AM Tao Li wrote: > > Hi Beam community, > > > > Does SnowflakeIO support spark runner? Seems like only direct runner and > dataflow runner are supported.. > > > > Thanks! > >
Re: Does SnowflakeIO support spark runner
Hi @Kyle Weaver<mailto:kcwea...@google.com> According to this doc<https://beam.apache.org/documentation/io/built-in/snowflake/>: --runner= From: Kyle Weaver Reply-To: "user@beam.apache.org" Date: Thursday, May 6, 2021 at 12:01 PM To: "user@beam.apache.org" Cc: Anuj Gandhi Subject: Re: Does SnowflakeIO support spark runner As far as I know, it should be supported (Beam's abstract model means IOs usually "just work" on all runners). What makes you think it isn't supported? On Thu, May 6, 2021 at 11:52 AM Tao Li mailto:t...@zillow.com>> wrote: Hi Beam community, Does SnowflakeIO support spark runner? Seems like only direct runner and dataflow runner are supported.. Thanks!
Re: Does SnowflakeIO support spark runner
As far as I know, it should be supported (Beam's abstract model means IOs usually "just work" on all runners). What makes you think it isn't supported? On Thu, May 6, 2021 at 11:52 AM Tao Li wrote: > Hi Beam community, > > > > Does SnowflakeIO support spark runner? Seems like only direct runner and > dataflow runner are supported.. > > > > Thanks! >
Does SnowflakeIO support spark runner
Hi Beam community, Does SnowflakeIO support spark runner? Seems like only direct runner and dataflow runner are supported.. Thanks!
Re: [Question] Docker and File Errors with Spark PortableRunner
Hi Michael, Your problems in 1. and 2. are related to the artifact staging workflow, where Beam tries to copy your pipeline’s dependencies to the workers. When artifacts cannot be fetched because of file system or other issues, the workers cannot be started successfully. In this case, your pipeline depends on the pickled main session from estimate_pi.py [1]. In order for artifact staging to work, the job server’s --artifacts-dir must be accessible by the Spark worker.* Since you start your job server in a Docker container, /users/mkuchnik/staging is hidden inside that Docker container’s filesystem, which is not accessible from your network filesystem. You mentioned in 2. that you tried mounting the directory to the [worker (?)] containers, but have you tried mounting that directory to the job server container? Thanks, Kyle * It looks like this is unclear in current documentation, so I will edit it. [1] https://github.com/apache/beam/blob/2c619c81082839e054f16efee9311b9f74b6e436/sdks/python/apache_beam/examples/complete/estimate_pi.py#L118
Re: Is there a perf comparison between Beam (on spark) and native Spark?
Thanks @Alexey Romanenko<mailto:aromanenko@gmail.com> for this info. Do we have a rough idea how Beam (on spark) compares with native Spark by using TPCDS or any benchmarks? I am just wondering if run Beam sql with Spark runner will have a similar processing time compared with Spark sql. Thanks! From: Alexey Romanenko Reply-To: "user@beam.apache.org" Date: Tuesday, March 23, 2021 at 12:58 PM To: "user@beam.apache.org" Subject: Re: Is there a perf comparison between Beam (on spark) and native Spark? There is an extension in Beam to support TPC-DS benchmark [1] that basically runs TPC-DS SQL queries via Beam SQL. Though, I’m not sure if it runs regularly and, IIRC (when I took a look on this last time, maybe I’m mistaken), it requires some adjustments to run on any other runners than Dataflow. Also, when I tried to run it on SparkRunner many queries failed because of different reasons [2]. I believe that if we will manage to make it running for most of the queries on any runner then it will be a good addition to Nexmark benchmark that we have for now since TPC-DS results can be used to compare with other data processing systems as well. [1] https://github.com/apache/beam/tree/master/sdks/java/testing/tpcds<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Ftree%2Fmaster%2Fsdks%2Fjava%2Ftesting%2Ftpcds=04%7C01%7Ctaol%40zillow.com%7C3a7b26c3aead4633412408d8ee361603%7C033464830d1840e7a5883784ac50e16f%7C0%7C1%7C637521263368804132%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=4Tjd1BcEHRJQUsH9DK1ASVM496nNaqZGetFD4%2F46B7k%3D=0> [2] https://issues.apache.org/jira/browse/BEAM-9891<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-9891=04%7C01%7Ctaol%40zillow.com%7C3a7b26c3aead4633412408d8ee361603%7C033464830d1840e7a5883784ac50e16f%7C0%7C1%7C637521263368804132%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=ibmzJ3cPSHzDjVPBR4A5jTQTs2O2obmh%2FDQG2X3UBSg%3D=0> On 22 Mar 2021, at 18:00, Tao Li mailto:t...@zillow.com>> wrote: Hi Beam community, I am wondering if there is a doc to compare perf of Beam (on Spark) and native spark for batch processing? For example using TPCDS benmark. I did find some relevant links like this<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Farchive.fosdem.org%2F2018%2Fschedule%2Fevent%2Fnexmark_benchmarking_suite%2Fattachments%2Fslides%2F2494%2Fexport%2Fevents%2Fattachments%2Fnexmark_benchmarking_suite%2Fslides%2F2494%2FNexmark_Suite_for_Apache_Beam_(FOSDEM18).pdf=04%7C01%7Ctaol%40zillow.com%7C3a7b26c3aead4633412408d8ee361603%7C033464830d1840e7a5883784ac50e16f%7C0%7C1%7C637521263368814090%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=4Dk5m6rlS8MLhHhiCY42bbGM3qZ2tzRQVxihL1TnL%2BU%3D=0> but it’s old and it mostly covers the streaming scenarios. Thanks!
Re: Is there a perf comparison between Beam (on spark) and native Spark?
There is an extension in Beam to support TPC-DS benchmark [1] that basically runs TPC-DS SQL queries via Beam SQL. Though, I’m not sure if it runs regularly and, IIRC (when I took a look on this last time, maybe I’m mistaken), it requires some adjustments to run on any other runners than Dataflow. Also, when I tried to run it on SparkRunner many queries failed because of different reasons [2]. I believe that if we will manage to make it running for most of the queries on any runner then it will be a good addition to Nexmark benchmark that we have for now since TPC-DS results can be used to compare with other data processing systems as well. [1] https://github.com/apache/beam/tree/master/sdks/java/testing/tpcds [2] https://issues.apache.org/jira/browse/BEAM-9891 > On 22 Mar 2021, at 18:00, Tao Li wrote: > > Hi Beam community, > > I am wondering if there is a doc to compare perf of Beam (on Spark) and > native spark for batch processing? For example using TPCDS benmark. > > I did find some relevant links like this > <https://archive.fosdem.org/2018/schedule/event/nexmark_benchmarking_suite/attachments/slides/2494/export/events/attachments/nexmark_benchmarking_suite/slides/2494/Nexmark_Suite_for_Apache_Beam_(FOSDEM18).pdf> > but it’s old and it mostly covers the streaming scenarios. > > Thanks!
Re: Is there a perf comparison between Beam (on spark) and native Spark?
+Kyle Weaver Kyle, do you happen to have some information here? On Mon, Mar 22, 2021 at 10:00 AM Tao Li wrote: > Hi Beam community, > > > > I am wondering if there is a doc to compare perf of Beam (on Spark) and > native spark for batch processing? For example using TPCDS benmark. > > > > I did find some relevant links like this > <https://archive.fosdem.org/2018/schedule/event/nexmark_benchmarking_suite/attachments/slides/2494/export/events/attachments/nexmark_benchmarking_suite/slides/2494/Nexmark_Suite_for_Apache_Beam_(FOSDEM18).pdf> > but it’s old and it mostly covers the streaming scenarios. > > > > Thanks! >
Is there a perf comparison between Beam (on spark) and native Spark?
Hi Beam community, I am wondering if there is a doc to compare perf of Beam (on Spark) and native spark for batch processing? For example using TPCDS benmark. I did find some relevant links like this<https://archive.fosdem.org/2018/schedule/event/nexmark_benchmarking_suite/attachments/slides/2494/export/events/attachments/nexmark_benchmarking_suite/slides/2494/Nexmark_Suite_for_Apache_Beam_(FOSDEM18).pdf> but it’s old and it mostly covers the streaming scenarios. Thanks!
Re: NotSerializableException in Spark runner
Yes Alexey, com.walmart.dataplatform.aorta.river.meta.Payload class is serializable. Strange observation is overall spark job is successful but in executor logs we continuously observed this exception. As code is same across all nodes, how same code is working fine in one node and failing on another node with NotSerializableException. Regards, Ajit Dongre From: Alexey Romanenko Reply to: "user@beam.apache.org" Date: Tuesday, 1 December 2020 at 11:35 PM To: "user@beam.apache.org" Subject: EXT: Re: NotSerializableException in Spark runner Could you make sure that the instance of com.walmart.dataplatform.aorta.river.meta.Payload, created from a failed record, is serializable? On 1 Dec 2020, at 06:06, Ajit Dongre mailto:ajit.don...@walmartlabs.com>> wrote: Hi all, I have pipeline to read data from kafka & write to file. I am using Beam 2.12 with spark runner in java. While executing I am getting below exception : org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: java.io.NotSerializableException: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow Serialization stack: - object not serializable (class: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}) - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) - object (class scala.Tuple2, (Tag:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, index=0}})) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140) at org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608) at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583) at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92) at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.spark.network.util.
Re: NotSerializableException in Spark runner
Could you make sure that the instance of com.walmart.dataplatform.aorta.river.meta.Payload, created from a failed record, is serializable? > On 1 Dec 2020, at 06:06, Ajit Dongre wrote: > > Hi all, > > I have pipeline to read data from kafka & write to file. I am using Beam > 2.12 with spark runner in java. While executing I am getting below exception > : > > org.apache.spark.network.client.ChunkFetchFailureException: Failure while > fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: > java.io.NotSerializableException: > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow > Serialization stack: > - object not serializable (class: > org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: > TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, > timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, > timing=EARLY, index=0}}) > - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) > - object (class scala.Tuple2, > (Tag:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, > timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, > timing=EARLY, index=0}})) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) > at > org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140) > at > org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193) > at > org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608) > at > org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583) > at > org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583) > at > org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377) > at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61) > at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) > at > scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) > at > org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92) > at > org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandler
NotSerializableException in Spark runner
Hi all, I have pipeline to read data from kafka & write to file. I am using Beam 2.12 with spark runner in java. While executing I am getting below exception : org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=1660417857014, chunkIndex=0}: java.io.NotSerializableException: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow Serialization stack: - object not serializable (class: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}) - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) - object (class scala.Tuple2, (Tag:49#bb20b45fd4d95138>,TimestampedValueInGlobalWindow{value=com.walmart.dataplatform.aorta.river.meta.Payload@727f8c85, timestamp=2020-10-07T09:02:16.340Z, pane=PaneInfo{isFirst=true, timing=EARLY, index=0}})) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140) at org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:193) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:608) at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583) at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:583) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:583) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:377) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:92) at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:137) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChan
Re: is apache beam go sdk supported by spark runner?
Yes, it should be for batch (just like for Python). There is ongoing work to make it work for Streaming as well. On Sat, Nov 21, 2020 at 2:57 PM Meriem Sara wrote: > > Hello everyone. I am trying to use apache beam with Golang to execute a data > processing workflow using apache Spark. However, I am confused if the go SDK > is supported by apache Spark. Could you please provide us wirh more > information ? > > Thank you
Re: is apache beam go sdk supported by apache Spark runner
As I can say for Spark Runner, natively it supports only Java SDK. I’m far away a Go SDK expert, but I think you can run a pipeline, written with Go SDK, using a Portable Runner and Spark Runner Job Server, like it’s possible to do for Python SDK pipelines. I’m not sure if it’s already officially supported but I believe that GoSDK-people may provide more details on this. Actually, I moved forward and I tried to run it on my side but with no success for now. 1) Run a Spark Runner Job Server: $ docker run --net=host apache/beam_spark_job_server:latest 20/11/23 17:50:04 INFO org.apache.beam.runners.jobsubmission.JobServerDriver: ArtifactStagingService started on localhost:8098 20/11/23 17:50:04 INFO org.apache.beam.runners.jobsubmission.JobServerDriver: Java ExpansionService started on localhost:8097 20/11/23 17:50:04 INFO org.apache.beam.runners.jobsubmission.JobServerDriver: JobService started on localhost:8099 2) Run strinsplit example on master and I have the protobuf error: $ go run sdks/go/examples/stringsplit/stringsplit.go --runner=universal --endpoint=localhost:8099 panic: proto: file "v1.proto" is already registered See https://developers.google.com/protocol-buffers/docs/reference/go/faq#namespace-conflict goroutine 1 [running]: google.golang.org/protobuf/reflect/protoregistry.glob..func1(0x1d85000, 0xc00039c700, 0x1d68780, 0xc0003a4430, 0xc00039c700) /Users/aromanenko/go/src/google.golang.org/protobuf/reflect/protoregistry/registry.go:38 +0x21f google.golang.org/protobuf/reflect/protoregistry.(*Files).RegisterFile(0xc80520, 0x1d87ac0, 0xc00039c700, 0x0, 0x0) /Users/aromanenko/go/src/google.golang.org/protobuf/reflect/protoregistry/registry.go:111 +0xb72 google.golang.org/protobuf/internal/filedesc.Builder.Build(0x0, 0x0, 0xc000230a00, 0x12a, 0x200, 0x10001, 0x0, 0x1d6f3a0, 0xc3c450, 0x1d79460, ...) /Users/aromanenko/go/src/google.golang.org/protobuf/internal/filedesc/build.go:113 +0x1aa github.com/golang/protobuf/proto.RegisterFile(0x1c60292, 0x8, 0x23708a0, 0xe2, 0xe2) /Users/aromanenko/go/src/github.com/golang/protobuf/proto/registry.go:47 +0x147 github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1.init.1() /Users/aromanenko/go/src/github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1/v1.pb.go:115 +0x5a exit status 2 3) $ go version go version go1.15.5 darwin/amd64 I wonder if it's a known issue or it’s something wrong with my environment? > On 22 Nov 2020, at 00:05, Meriem Sara wrote: > > Hello everyone. I am trying to use apache beam with Golang to execute a data > processing workflow using apache Spark. However, I am confused if the go SDK > is supported by apache Spark. Could you please provide us wirh more > information ? > > Thank you
is apache beam go sdk supported by apache Spark runner
Hello everyone. I am trying to use apache beam with Golang to execute a data processing workflow using apache Spark. However, I am confused if the go SDK is supported by apache Spark. Could you please provide us wirh more information ? Thank you
is apache beam go sdk supported by spark runner?
Hello everyone. I am trying to use apache beam with Golang to execute a data processing workflow using apache Spark. However, I am confused if the go SDK is supported by apache Spark. Could you please provide us wirh more information ? Thank you
Re: Spark Portable Runner + Docker
Hi Alex -- Please se the details you are looking for. I am running a sample pipeline and my environment is this. python "SaiStudy - Apache-Beam-Spark.py" --runner=PortableRunner --job_endpoint=192.168.99.102:8099 My Spark is running on a Docker Container and I can see that the JobService is running at 8099. I am getting the following error: grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses" debug_error_string = "{"created":"@1603539936.53600","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_chann el.cc","file_line":4090,"referenced_errors":[{"created":"@1603539936.53600","description":"failed to connect to all addresses","file":"src/core/ext/filters/cli ent_channel/lb_policy/pick_first/pick_first.cc","file_line":394,"grpc_status":14}]}" When I curl to ip:port, I can see the following error from the docker logs Oct 24, 2020 11:34:50 AM org.apache.beam.vendor.grpc.v1p26p0.io.grpc.netty.NettyServerTransport notifyTerminated INFO: Transport failed org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2Exception: Unexpected HTTP/1.x request: GET / at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:103) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.readClientPrefaceString(Http2ConnectionHandler.java:302) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:239) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProt ection(ByteToMessageDecoder.java:505) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:37 4) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044 ) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Help Please. On 2020/10/28 15:37:22, Alexey Romanenko wrote: > Hi Ramesh, > > By “+ Docker” do you mean Docker SDK Harness or running a Spark in Docker? > For the former I believe it works fine. > > Could you share more details of what kind of error you are facing? > > > On 27 Oct 2020, at 21:10, Ramesh Mathikumar wrote: > > > > Hi Group -- Has anyone got this to work? For me it does not either in the > > IDE or in Colab. Whats the community take on this one? > >
Re: Spark Portable Runner + Docker
Hi Ramesh, By “+ Docker” do you mean Docker SDK Harness or running a Spark in Docker? For the former I believe it works fine. Could you share more details of what kind of error you are facing? > On 27 Oct 2020, at 21:10, Ramesh Mathikumar wrote: > > Hi Group -- Has anyone got this to work? For me it does not either in the IDE > or in Colab. Whats the community take on this one?
Spark Portable Runner + Docker
Hi Group -- Has anyone got this to work? For me it does not either in the IDE or in Colab. Whats the community take on this one?
Re: Beam + Spark Portable Runner
On the Spark runner webpage [1], we recommend running the job server container with --net=host so all the job server's ports are exposed to the host. However, host networking is not available if you're using Windows, so you would have to expose individual ports instead. The job server uses ports 8099, 8098, and 8097, so you can add them to your Docker command like this: docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:7077 [1] https://beam.apache.org/documentation/runners/spark/ On Sat, Oct 24, 2020 at 4:12 PM Ramesh Mathikumar wrote: > Its very strange. > > I run the same code in Colab and it works fine. Just to ensure its not > using a local runner - i closed the Flink Connector and then it came back > as connection refused. When I ran it again bit with now Flink running again > - worked as a doddle. > > > > On 2020/10/24 22:56:18, Ankur Goenka wrote: > > Can you try running > > java -jar > > > C:\\Users\\rekharamesh/.apache_beam/cache/jars\\beam-runners-flink-1.8-job-server-2.24.0.jar > > --flink-master http://localhost:8081 --artifacts-dir > > > C:\\Users\\REKHAR~1\\AppData\\Local\\Temp\\beam-temp4r_emy7q\\artifactskmzxyxkl > > --job-port 57115 --artifact-port 0 --expansion-port 0 > > > > to see why the job server is failing. > > > > On Sat, Oct 24, 2020 at 3:52 PM Ramesh Mathikumar < > meetr...@googlemail.com> > > wrote: > > > > > Hi Ankur, > > > > > > Thanks for the prompt response. I suspected a similar issue to to > validate > > > that I ran a local cluster of Flink. My Parameters are are follows. > > > > > > options = PipelineOptions([ > > > "--runner=FlinkRunner", > > > "--flink_version=1.8", > > > "--flink_master=localhost:8081", > > > "--environment_type=LOOPBACK" > > > ]) > > > > > > And when I run it - it does not even reach the cluster - instead it > bombs > > > with a following message. > > > > > > > > > WARNING:root:Make sure that locally built Python SDK docker image has > > > Python 3.6 interpreter. > > > ERROR:apache_beam.utils.subprocess_server:Starting job service with > > > ['java', '-jar', > > > > 'C:\\Users\\rekharamesh/.apache_beam/cache/jars\\beam-runners-flink-1.8-job-se > > > rver-2.24.0.jar', '--flink-master', 'http://localhost:8081', > > > '--artifacts-dir', > > > > 'C:\\Users\\REKHAR~1\\AppData\\Local\\Temp\\beam-temp4r_emy7q\\artifactskmzxyxkl', > > > '--job-port', '57115', '--artifact-port', '0', '--expansion-port', '0'] > > > ERROR:apache_beam.utils.subprocess_server:Error bringing up service > > > Traceback (most recent call last): > > > File > > > > "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\utils\subprocess_server.py", > > > line 88, in start > > > 'Service failed to start up with error %s' % self._process.poll()) > > > RuntimeError: Service failed to start up with error 1 > > > Traceback (most recent call last): > > > File "SaiStudy - Apache-Beam-Spark.py", line 34, in > > > | 'Write results' >> beam.io.WriteToText(outputs_prefix) > > > File > > > > "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\pipeline.py", > > > line 555, in __exit__ > > > self.result = self.run() > > > File > > > > "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\pipeline.py", > > > line 534, in run > > > return self.runner.run_pipeline(self, self._options) > > > File > > > > "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\runners\portability\flink_runner.py", > > > line 49, in run_pipeline > > > > > > return super(FlinkRunner, self).run_pipeline(pipeline, options) > > > File > > > > "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\runners\portability\portable_runner.py", > > > line 388, in run_pipe > > > line > > > job_service_handle = self.create_job_service(options) > > > File > > > > "C:\Users\rekharamesh\AppData\Local\Programs\Python\Python36-32\lib\site-packages\apache_beam\runners\portability\portable_runner.py", > > > line 304, in create_j > > > ob_service &