SparkRunner
Hi everyone. We have a Beam pipeline running using the portable Spark runner on EMR. If we use 100% on-demand Core nodes the pipeline finishes successfully. If we run a mix of on-demand Core nodes and spot Task nodes the pipeline fails every time with the following error. Does Beam have resiliency against losing nodes and does it schedule with awareness of Core vs Task nodes? Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to stage failure: A shuffle map stage with indeterminate output was failed and retried. However, Spark cannot rollback the ShuffleMapStage 5 to re-process the input data, and has to fail this job. Please eliminate the indeterminacy by checkpointing the RDD before repartition and try again. at org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92) at org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:199) at org.apache.beam.runners.spark.SparkPipelineRunner.main(SparkPipelineRunner.java:263) ... 5 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: A shuffle map stage with indeterminate output was failed and retried. However, Spark cannot rollback the ShuffleMapStage 5 to re-process the input data, and has to fail this job. Please eliminate the indeterminacy by checkpointing the RDD before repartition and try again. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2136) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2124) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2123) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$13.apply(DAGScheduler.scala:1674) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$13.apply(DAGScheduler.scala:1666) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) Thanks, Trevor
Re: spark-submit and the portable runner
Thanks. That was the issue. The latest release of Beam indicates it supports Spark 3 and I find references in the code to Hadoop 3.2.1. Is it possible to configure beam to run on Hadoop 3.2.1? Trevor On Mon, Jun 21, 2021 at 6:19 PM Kyle Weaver wrote: > Looks like a version mismatch between Hadoop dependencies [1]. Beam's > Python wrapper for Spark is currently pinned to Spark 2.4.8 [2]. Which > Spark and Hadoop versions are your cluster using? > > [1] > https://stackoverflow.com/questions/62880009/error-through-remote-spark-job-java-lang-illegalaccesserror-class-org-apache-h > [2] https://issues.apache.org/jira/browse/BEAM-12094 > > On Mon, Jun 21, 2021 at 9:52 AM Trevor Kramer > wrote: > >> Does anyone have an example of using spark-submit to run a beam job using >> the portable runner? The documentation indicates this is possible but >> doesn't give an example of how to do it. >> >> I am using the following pipeline options to generate the jar >> >> options = PipelineOptions(['--runner=SparkRunner', >>'--environment_type=DOCKER', >>'--environment_config=path-to-image:latest', >>'--output_executable_path=output.jar' >>]) >> >> and am trying to run it with >> >> spark-submit --master yarn --deploy-mode cluster --class >> org.apache.beam.runners.spark.SparkPipelineRunner output.jar >> >> >> but I get the following error >> >> >> Exception in thread "main" java.lang.IllegalAccessError: class >> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface >> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator >> >> at java.lang.ClassLoader.defineClass1(Native Method) >> >> at java.lang.ClassLoader.defineClass(ClassLoader.java:757) >> >> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) >> >> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) >> >> at java.net.URLClassLoader.access$100(URLClassLoader.java:74) >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:369) >> >> at java.net.URLClassLoader$1.run(URLClassLoader.java:363) >> >> at java.security.AccessController.doPrivileged(Native Method) >> >> at java.net.URLClassLoader.findClass(URLClassLoader.java:362) >> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:419) >> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:352) >> >> at java.lang.Class.forName0(Native Method) >> >> at java.lang.Class.forName(Class.java:348) >> >> at >> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) >> >> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) >> >> at java.util.ServiceLoader$1.next(ServiceLoader.java:480) >> >> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268) >> >> at >> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313) >> >> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352) >> >> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123) >> >> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403) >> >> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371) >> >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482) >> >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230) >> >> at >> org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138) >> >> at scala.Option.getOrElse(Option.scala:189) >> >> at org.apache.spark.deploy.yarn.Client.(Client.scala:138) >> >> at >> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526) >> >> at org.apache.spark.deploy.SparkSubmit.org >> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) >> >> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) >> >> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) >> >> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) >> >> at >> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) >> >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) >> >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >>
spark-submit and the portable runner
Does anyone have an example of using spark-submit to run a beam job using the portable runner? The documentation indicates this is possible but doesn't give an example of how to do it. I am using the following pipeline options to generate the jar options = PipelineOptions(['--runner=SparkRunner', '--environment_type=DOCKER', '--environment_config=path-to-image:latest', '--output_executable_path=output.jar' ]) and am trying to run it with spark-submit --master yarn --deploy-mode cluster --class org.apache.beam.runners.spark.SparkPipelineRunner output.jar but I get the following error Exception in thread "main" java.lang.IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:757) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230) at org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.deploy.yarn.Client.(Client.scala:138) at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526) at org.apache.spark.deploy.SparkSubmit.org $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
CANCELLED: call already cancelled
Hello, I am trying to run a Beam pipeline on Flink using EMR. I am consistently getting these errors. I found a reference to a bug report that said this issue was fixed in 1.11. I am using 1.12.1. Caused by: org.apache.beam.vendor.grpc.v1p36p0.io.grpc. StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status .asRuntimeException(Status.java:526) at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub. ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:351) at org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext( DirectStreamObserver.java:98) at org.apache.beam.sdk.fn.data. BeamFnDataSizeBasedBufferingOutboundObserver.flush( BeamFnDataSizeBasedBufferingOutboundObserver.java:103) at org.apache.beam.sdk.fn.data. BeamFnDataSizeBasedBufferingOutboundObserver.accept( BeamFnDataSizeBasedBufferingOutboundObserver.java:115) at org.apache.beam.runners.fnexecution.control. SdkHarnessClient$CountingFnDataReceiver.accept(SdkHarnessClient.java:718) at org.apache.beam.runners.flink.translation.functions. FlinkExecutableStageFunction.processElements(FlinkExecutableStageFunction .java:362) at org.apache.beam.runners.flink.translation.functions. FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java: 267) at org.apache.flink.runtime.operators.MapPartitionDriver.run( MapPartitionDriver.java:113) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java: 357) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Is there a more solid runner for running Beam jobs in an AWS environment? Thanks, Trevor
No data sinks have been created yet.
Hello Beam community, I am getting the following error running a Beam pipeline on Flink. RuntimeError: Pipeline BeamApp failed in state FAILED: java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it. Here is my pipeline which I believe has a sink at the end of it. What am I missing? with beam.Pipeline(options=options) as p: (p | 'Read SDF' >> ParseSDF('s3://some-path.sdf') | 'Sample' >> beam.combiners.Sample.FixedSizeGlobally(1000) | 'Flatten' >> beam.FlatMap(lambda x: x) | 'Standardize' >> beam.Map(standardize) | 'Make FPs' >> beam.Map(calculate_fps) | 'Make Dict' >> beam.Map(lambda x: {'fp': x}) | 'Write Parquet' >> WriteToParquet('s3://some-path', pyarrow.schema( [('fp', pyarrow.list_(pyarrow.int64(), 2048))] )) ) Thanks, Trevor
Re: Provider com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype
I made a merge request to fix this issue https://github.com/apache/beam/pull/14953 On Sat, Jun 5, 2021 at 7:13 AM Ismaël Mejía wrote: > Hello, seems to be a known issue: > https://issues.apache.org/jira/browse/BEAM-10430 > > I don't know however if someone has already find a proper fix or > workaround. > > > On Fri, Jun 4, 2021 at 8:22 PM Trevor Kramer > wrote: > > > > Relating to my earlier message I sometimes get this error instead. > > > > java.util.ServiceConfigurationError: > com.fasterxml.jackson.databind.Module: Provider > com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype > > > > Searching google I have found some indications that this might be a > version conflict with Jackson. I don't see how to resolve this with Python > code. Does anyone have Beam running on Flink on EMR? > > > > Thanks, > > > > Trevor > > >
Provider com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype
Relating to my earlier message I sometimes get this error instead. java.util.ServiceConfigurationError: com.fasterxml.jackson.databind.Module: Provider com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype Searching google I have found some indications that this might be a version conflict with Jackson. I don't see how to resolve this with Python code. Does anyone have Beam running on Flink on EMR? Thanks, Trevor
java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.runners.core.construction.SerializablePipelineOptions
Hello, I am new to Beam and am trying to get a simple example pipeline running on EMR using the Flink Runner. I am getting the following error message. Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.runners.core.construction.SerializablePipelineOptions at java.io.ObjectStreamClass.hasStaticInitializer(Native Method) at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java: 1955) at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java: 272) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java: 2003) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2160) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java: 2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:615) at org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:600) at org.apache.flink.util.InstantiationUtil.deserializeObject( InstantiationUtil.java:587) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( InstantiationUtil.java:541) at org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory .readParametersFromConfig(RuntimeSerializerFactory.java:78) at org.apache.flink.runtime.operators.util.TaskConfig .getTypeSerializerFactory(TaskConfig.java:1246) at org.apache.flink.runtime.operators.util.TaskConfig .getOutputSerializer(TaskConfig.java:599) at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat( DataSourceTask.java:318) at org.apache.flink.runtime.operators.DataSourceTask.invoke( DataSourceTask.java:102) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Here is my pipeline: def run(port): options = PipelineOptions([ "--runner=FlinkRunner", "--flink_version=1.12", f"--flink_master=127.0.0.1:{port}", "--environment_type=DOCKER", '--environment_config=docker-image' ]) with beam.Pipeline(options=options) as p: (p | 'Create words' >> beam.Create(['to be or not to be']) | 'Split words' >> beam.FlatMap(lambda words: words.split(' ')) | 'Write to file' >> WriteToText('test.txt') ) if __name__ == "__main__": run(8081) The master url is localhost because I am using a ssh tunnel (not shown). Has anyone seen this error before? I am running on EMR 6.3.0 and Flink 1.12.1. Thanks, Trevor INFO:apache_beam.runners.portability.fn_api_runner.translations: INFO:apache_beam.runners.portability.fn_api_runner.translations: INFO:apache_beam.runners.portability.flink_runner:Adding HTTP protocol scheme to flink_master parameter: http://127.0.0.1:64831 INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.12-job-server/2.29.0/beam-runners-flink-1.12-job-server-2.29.0.jar INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/Users/tkramer/.apache_beam/cache/jars/beam-runners-flink-1.12-job-server-2.29.0.jar' '--flink-master' 'http://127.0.0.1:64831' '--artifacts-dir' '/var/folders/np/lbz94j017px4t8zktt86pbrcgp/T/beam-tempuhv3qq6j/artifactsz7a040z7' '--job-port' '64832' '--artifact-port' '0' '--expansion-port' '0'] INFO:apache_beam.utils.subprocess_server:b'Jun 04, 2021 8:32:08 AM org.apache.beam.runners.jobsubmission.JobServerDriver createArtifactStagingService' INFO:apache_beam.utils.subprocess_server:b'INFO: ArtifactStagingService started on localhost:64839' INFO:apache_beam.utils.subprocess_server:b'Jun 04, 2021 8:32:08 AM org.apache.beam.runners.jobsubmission.JobServerDriver createExpansionService' INFO:apache_beam.utils.subprocess_server:b'INFO: Java ExpansionService started on localhost:64840' INFO:apache_beam.utils.subprocess_server:b'Jun 04, 2021 8:32:08 AM org.apache.beam.r