SparkRunner

2021-07-01 Thread Trevor Kramer
Hi everyone. We have a Beam pipeline running using the portable Spark
runner on EMR. If we use 100% on-demand Core nodes the pipeline finishes
successfully. If we run a mix of on-demand Core nodes and spot Task nodes
the pipeline fails every time with the following error. Does Beam have
resiliency against losing nodes and does it schedule with awareness of Core
vs Task nodes?

Caused by: java.lang.RuntimeException:
org.apache.spark.SparkException: Job aborted due to stage failure: A
shuffle map stage with indeterminate output was failed and retried.
However, Spark cannot rollback the ShuffleMapStage 5 to re-process the
input data, and has to fail this job. Please eliminate the
indeterminacy by checkpointing the RDD before repartition and try
again.
at 
org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
at 
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
at 
org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:199)
at 
org.apache.beam.runners.spark.SparkPipelineRunner.main(SparkPipelineRunner.java:263)
... 5 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: A shuffle map stage with indeterminate output was failed and
retried. However, Spark cannot rollback the ShuffleMapStage 5 to
re-process the input data, and has to fail this job. Please eliminate
the indeterminacy by checkpointing the RDD before repartition and try
again.
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2136)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2124)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2123)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$13.apply(DAGScheduler.scala:1674)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$13.apply(DAGScheduler.scala:1666)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)


Thanks,


Trevor


Re: spark-submit and the portable runner

2021-06-24 Thread Trevor Kramer
Thanks. That was the issue. The latest release of Beam indicates it
supports Spark 3 and I find references in the code to Hadoop 3.2.1. Is it
possible to configure beam to run on Hadoop 3.2.1?

Trevor

On Mon, Jun 21, 2021 at 6:19 PM Kyle Weaver  wrote:

> Looks like a version mismatch between Hadoop dependencies [1]. Beam's
> Python wrapper for Spark is currently pinned to Spark 2.4.8 [2]. Which
> Spark and Hadoop versions are your cluster using?
>
> [1]
> https://stackoverflow.com/questions/62880009/error-through-remote-spark-job-java-lang-illegalaccesserror-class-org-apache-h
> [2] https://issues.apache.org/jira/browse/BEAM-12094
>
> On Mon, Jun 21, 2021 at 9:52 AM Trevor Kramer 
> wrote:
>
>> Does anyone have an example of using spark-submit to run a beam job using
>> the portable runner? The documentation indicates this is possible but
>> doesn't give an example of how to do it.
>>
>> I am using the following pipeline options to generate the jar
>>
>> options = PipelineOptions(['--runner=SparkRunner',
>>'--environment_type=DOCKER',
>>'--environment_config=path-to-image:latest',
>>'--output_executable_path=output.jar'
>>])
>>
>> and am trying to run it with
>>
>> spark-submit --master yarn --deploy-mode cluster --class
>> org.apache.beam.runners.spark.SparkPipelineRunner output.jar
>>
>>
>> but I get the following error
>>
>>
>> Exception in thread "main" java.lang.IllegalAccessError: class
>> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
>> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:348)
>>
>> at
>> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>>
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>>
>> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>>
>> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>>
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)
>>
>> at
>> org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)
>>
>> at scala.Option.getOrElse(Option.scala:189)
>>
>> at org.apache.spark.deploy.yarn.Client.(Client.scala:138)
>>
>> at
>> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)
>>
>> at org.apache.spark.deploy.SparkSubmit.org
>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
>>
>> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>>
>> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>>
>> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
>>
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
>>
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>


spark-submit and the portable runner

2021-06-21 Thread Trevor Kramer
Does anyone have an example of using spark-submit to run a beam job using
the portable runner? The documentation indicates this is possible but
doesn't give an example of how to do it.

I am using the following pipeline options to generate the jar

options = PipelineOptions(['--runner=SparkRunner',
   '--environment_type=DOCKER',
   '--environment_config=path-to-image:latest',
   '--output_executable_path=output.jar'
   ])

and am trying to run it with

spark-submit --master yarn --deploy-mode cluster --class
org.apache.beam.runners.spark.SparkPipelineRunner output.jar


but I get the following error


Exception in thread "main" java.lang.IllegalAccessError: class
org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator

at java.lang.ClassLoader.defineClass1(Native Method)

at java.lang.ClassLoader.defineClass(ClassLoader.java:757)

at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)

at java.net.URLClassLoader.access$100(URLClassLoader.java:74)

at java.net.URLClassLoader$1.run(URLClassLoader.java:369)

at java.net.URLClassLoader$1.run(URLClassLoader.java:363)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:362)

at java.lang.ClassLoader.loadClass(ClassLoader.java:419)

at java.lang.ClassLoader.loadClass(ClassLoader.java:352)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:348)

at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)

at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)

at java.util.ServiceLoader$1.next(ServiceLoader.java:480)

at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)

at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)

at
org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.deploy.yarn.Client.(Client.scala:138)

at
org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)

at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)

at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)

at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)

at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)

at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


CANCELLED: call already cancelled

2021-06-14 Thread Trevor Kramer
Hello,

I am trying to run a Beam pipeline on Flink using EMR. I am consistently
getting these errors. I found a reference to a bug report that said this
issue was fixed in 1.11. I am using 1.12.1.

Caused by: org.apache.beam.vendor.grpc.v1p36p0.io.grpc.
StatusRuntimeException: CANCELLED: call already cancelled. Use
ServerCallStreamObserver.setOnCancelHandler() to disable this exception
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status
.asRuntimeException(Status.java:526)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.
ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:351)
at org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(
DirectStreamObserver.java:98)
at org.apache.beam.sdk.fn.data.
BeamFnDataSizeBasedBufferingOutboundObserver.flush(
BeamFnDataSizeBasedBufferingOutboundObserver.java:103)
at org.apache.beam.sdk.fn.data.
BeamFnDataSizeBasedBufferingOutboundObserver.accept(
BeamFnDataSizeBasedBufferingOutboundObserver.java:115)
at org.apache.beam.runners.fnexecution.control.
SdkHarnessClient$CountingFnDataReceiver.accept(SdkHarnessClient.java:718)
at org.apache.beam.runners.flink.translation.functions.
FlinkExecutableStageFunction.processElements(FlinkExecutableStageFunction
.java:362)
at org.apache.beam.runners.flink.translation.functions.
FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:
267)
at org.apache.flink.runtime.operators.MapPartitionDriver.run(
MapPartitionDriver.java:113)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:
357)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)

Is there a more solid runner for running Beam jobs in an AWS environment?

Thanks,

Trevor


No data sinks have been created yet.

2021-06-09 Thread Trevor Kramer
Hello Beam community,

I am getting the following error running a Beam pipeline on Flink.

RuntimeError: Pipeline BeamApp failed in state FAILED:
java.lang.RuntimeException: No data sinks have been created yet. A program
needs at least one sink that consumes data. Examples are writing the data
set or printing it.

Here is my pipeline which I believe has a sink at the end of it. What am
I missing?

with beam.Pipeline(options=options) as p:
(p
 | 'Read SDF' >> ParseSDF('s3://some-path.sdf')
 | 'Sample' >> beam.combiners.Sample.FixedSizeGlobally(1000)
 | 'Flatten' >> beam.FlatMap(lambda x: x)
 | 'Standardize' >> beam.Map(standardize)
 | 'Make FPs' >> beam.Map(calculate_fps)
 | 'Make Dict' >> beam.Map(lambda x: {'fp': x})
 | 'Write Parquet' >> WriteToParquet('s3://some-path', pyarrow.schema(
[('fp', pyarrow.list_(pyarrow.int64(), 2048))]
))
 )


Thanks,


Trevor


Re: Provider com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype

2021-06-05 Thread Trevor Kramer
I made a merge request to fix this issue

https://github.com/apache/beam/pull/14953

On Sat, Jun 5, 2021 at 7:13 AM Ismaël Mejía  wrote:

> Hello, seems to be a known issue:
> https://issues.apache.org/jira/browse/BEAM-10430
>
> I don't know however if someone has already find a proper fix or
> workaround.
>
>
> On Fri, Jun 4, 2021 at 8:22 PM Trevor Kramer 
> wrote:
> >
> > Relating to my earlier message I sometimes get this error instead.
> >
> > java.util.ServiceConfigurationError:
> com.fasterxml.jackson.databind.Module: Provider
> com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype
> >
> > Searching google I have found some indications that this might be a
> version conflict with Jackson. I don't see how to resolve this with Python
> code. Does anyone have Beam running on Flink on EMR?
> >
> > Thanks,
> >
> > Trevor
> >
>


Provider com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a subtype

2021-06-04 Thread Trevor Kramer
Relating to my earlier message I sometimes get this error instead.

java.util.ServiceConfigurationError: com.fasterxml.jackson.databind.Module:
Provider com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule not a
subtype

Searching google I have found some indications that this might be a version
conflict with Jackson. I don't see how to resolve this with Python code.
Does anyone have Beam running on Flink on EMR?

Thanks,

Trevor


java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.runners.core.construction.SerializablePipelineOptions

2021-06-04 Thread Trevor Kramer
Hello,

I am new to Beam and am trying to get a simple example pipeline running on
EMR using the Flink Runner. I am getting the following error message.

Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.beam.runners.core.construction.SerializablePipelineOptions
at java.io.ObjectStreamClass.hasStaticInitializer(Native Method)
at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:
1955)
at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:275)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:273)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:
272)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:694)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:
2003)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:615)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:600)
at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:587)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
InstantiationUtil.java:541)
at org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory
.readParametersFromConfig(RuntimeSerializerFactory.java:78)
at org.apache.flink.runtime.operators.util.TaskConfig
.getTypeSerializerFactory(TaskConfig.java:1246)
at org.apache.flink.runtime.operators.util.TaskConfig
.getOutputSerializer(TaskConfig.java:599)
at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(
DataSourceTask.java:318)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(
DataSourceTask.java:102)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)

Here is my pipeline:

def run(port):
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_version=1.12",
f"--flink_master=127.0.0.1:{port}",
"--environment_type=DOCKER",
'--environment_config=docker-image'
])

with beam.Pipeline(options=options) as p:
(p
 | 'Create words' >> beam.Create(['to be or not to be'])
 | 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
 | 'Write to file' >> WriteToText('test.txt')
 )


if __name__ == "__main__":
run(8081)


The master url is localhost because I am using a ssh tunnel (not shown).

Has anyone seen this error before? I am running on EMR 6.3.0 and Flink 1.12.1.


Thanks,


Trevor



INFO:apache_beam.runners.portability.fn_api_runner.translations:
 
INFO:apache_beam.runners.portability.fn_api_runner.translations:
 
INFO:apache_beam.runners.portability.flink_runner:Adding HTTP protocol
scheme to flink_master parameter: http://127.0.0.1:64831
INFO:apache_beam.utils.subprocess_server:Using cached job server jar
from 
https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.12-job-server/2.29.0/beam-runners-flink-1.12-job-server-2.29.0.jar
INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
'-jar' 
'/Users/tkramer/.apache_beam/cache/jars/beam-runners-flink-1.12-job-server-2.29.0.jar'
'--flink-master' 'http://127.0.0.1:64831' '--artifacts-dir'
'/var/folders/np/lbz94j017px4t8zktt86pbrcgp/T/beam-tempuhv3qq6j/artifactsz7a040z7'
'--job-port' '64832' '--artifact-port' '0' '--expansion-port' '0']
INFO:apache_beam.utils.subprocess_server:b'Jun 04, 2021 8:32:08 AM
org.apache.beam.runners.jobsubmission.JobServerDriver
createArtifactStagingService'
INFO:apache_beam.utils.subprocess_server:b'INFO:
ArtifactStagingService started on localhost:64839'
INFO:apache_beam.utils.subprocess_server:b'Jun 04, 2021 8:32:08 AM
org.apache.beam.runners.jobsubmission.JobServerDriver
createExpansionService'
INFO:apache_beam.utils.subprocess_server:b'INFO: Java ExpansionService
started on localhost:64840'
INFO:apache_beam.utils.subprocess_server:b'Jun 04, 2021 8:32:08 AM
org.apache.beam.r