We made some progress to parallelize our python code using beam-spark. 
Following your advice, we are using spark 3.2.1
The spark server and worker are connected ok.
In a third machine, the client machine, I am running the docker jobserver:
$ sudo docker run --net=host apache/beam_spark_job_server:latest 
--spark-master-url=spark://<SERVER-IP>:7077

Then on the client:
$ python test_beam.py

If it matters, the code in test_beam.py has the following:
    options = PipelineOptions([
        "--runner=PortableRunner",
        "--job_endpoint=localhost:8099",
        "--save_main_session",
        "--environment_type=DOCKER",
        "--environment_config=docker.io/apache/beam_python3.8_sdk:2.37.0"
    ])
    with beam.Pipeline(options=options) as p:
        lines = (p
        | 'Create words' >> beam.Create(['this is working'])
        | 'Add hostname' >> beam.Map(lambda words: addhost(words))
        | 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
        | 'Build byte array' >> beam.ParDo(ConvertToByteArray())
        | 'Group' >> beam.GroupBy() # Do future batching here
        | 'print output' >> beam.Map(myprint)
        )


I think I got the versions wrong because the the server logs gives 
(192.168.1.252 is the client IP):

22/04/04 11:51:57 DEBUG TransportServer: New connection accepted for remote 
address /192.168.1.252:53330.
22/04/04 11:51:57 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() for one-way message.
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; 
local class incompatible: stream classdesc serialVersionUID = 
6543101073799644159, local class serialVersionUID = 1574364215946805297
….


On the client logs, I got:

22/04/04 11:51:56 INFO 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Staging 
artifacts for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.
22/04/04 11:51:56 INFO 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Resolving 
artifacts for 
job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.ref_Environment_default_environment_1.
22/04/04 11:51:56 INFO 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Getting 1 
artifacts for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.null.
22/04/04 11:51:56 INFO 
org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService: Artifacts 
fully staged for job_9a9667dd-898f-4b9b-94b7-2c8b73f0ac27.
22/04/04 11:51:56 INFO org.apache.beam.runners.spark.SparkJobInvoker: Invoking 
job BeamApp-root-0404115156-3cc5f493_e0deca3a-6b67-40ad-bfe8-55ba7efd9038
22/04/04 11:51:56 INFO org.apache.beam.runners.jobsubmission.JobInvocation: 
Starting job invocation 
BeamApp-root-0404115156-3cc5f493_e0deca3a-6b67-40ad-bfe8-55ba7efd9038
22/04/04 11:51:56 INFO 
org.apache.beam.runners.core.construction.resources.PipelineResources: 
PipelineOptions.filesToStage was not specified. Defaulting to files from the 
classpath: will stage 6 files. Enable logging at DEBUG level to see which files 
will be staged.
22/04/04 11:51:56 INFO 
org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a brand 
new Spark Context.
22/04/04 11:51:56 WARN org.apache.spark.util.Utils: Your hostname, 
spark-ml-client resolves to a loopback address: 127.0.0.1; using 192.168.1.252 
instead (on interface eth0)
22/04/04 11:51:56 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you 
need to bind to another address
22/04/04 11:51:57 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
22/04/04 11:52:57 ERROR 
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application has 
been killed. Reason: All masters are unresponsive! Giving up.
22/04/04 11:52:57 WARN 
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend: Application ID 
is not initialized yet.
22/04/04 11:52:57 WARN 
org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint: Drop 
UnregisterApplication(null) because has not yet connected to master
22/04/04 11:52:57 WARN org.apache.spark.metrics.MetricsSystem: Stopping a 
MetricsSystem that is not running
22/04/04 11:52:57 ERROR org.apache.spark.SparkContext: Error initializing 
SparkContext.
java.lang.NullPointerException
                at 
org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:64)
                at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:248)
                at org.apache.spark.SparkContext.<init>(SparkContext.scala:510)
                at 
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
                at 
org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:101)
                at 
org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:67)
                at 
org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:118)
                at 
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:750)
22/04/04 11:52:57 ERROR org.apache.beam.runners.jobsubmission.JobInvocation: 
Error during job invocation 
BeamApp-root-0404115156-3cc5f493_e0deca3a-6b67-40ad-bfe8-55ba7efd9038.
java.lang.NullPointerException
                at 
org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:64)
                at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:248)
                at org.apache.spark.SparkContext.<init>(SparkContext.scala:510)
                at 
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
                at 
org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:101)
                at 
org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:67)
                at 
org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:118)
                at 
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:750)

Reply via email to