Hi Sean/Mich, Thanks for response.
That was the full log. Sending again for reference. I am just running foreach (lamda) which runs pure python code. Exception in read_logs : Py4JJavaError Traceback (most recent call last): File "/opt/spark/python/lib/python3.6/site-packages/filename.py", line 42, in read_logs data_df.rdd.foreach(lambda x: process_logs(x)) File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 789, in foreach self.mapPartitions(processPartition).count() # Force evaluation File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold vals = self.mapPartitions(func).collect() File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 15, 10.244.42.133, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command command = serializer._read_with_length(file) File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in loads return pickle.loads(obj, encoding=encoding) File "/opt/spark/python/lib/python3.6/site-packages/filename.py", line 10, in <module> spark = SparkSession.builder.appName("test").getOrCreate() File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate sc = SparkContext.getOrCreate(sparkConf) File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in getOrCreate SparkContext(conf=conf or SparkConf()) File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 133, in __init__ SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316, in _ensure_initialized SparkContext._gateway = gateway or launch_gateway(conf) File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line 46, in launch_gateway return _launch_gateway(conf) File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line 108, in _launch_gateway raise Exception("Java gateway process exited before sending its port number") Exception: Java gateway process exited before sending its port number at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command command = serializer._read_with_length(file) File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in loads return pickle.loads(obj, encoding=encoding) File "/opt/spark/python/lib/python3.6/site-packages/filename.py", line 10, in <module> spark = SparkSession.builder.appName("test").getOrCreate() File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate sc = SparkContext.getOrCreate(sparkConf) File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in getOrCreate SparkContext(conf=conf or SparkConf()) File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 133, in __init__ SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316, in _ensure_initialized SparkContext._gateway = gateway or launch_gateway(conf) File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line 46, in launch_gateway return _launch_gateway(conf) File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line 108, in _launch_gateway raise Exception("Java gateway process exited before sending its port number") Exception: Java gateway process exited before sending its port number at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more End Main... Thanks Rajat On Sat, May 8, 2021 at 3:02 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > By yarn mode I meant dealing with issues raised in a cluster wide. > > From personal experience, I find it easier to trace these sorts of errors > when I run the code in local mode as it could be related to the set-up and > easier to track where things go wrong when one is dealing with local mode. > > This line > > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.collectAndServe. > > > may be related to the following stackoverflow error > > py4j.protocol.Py4JJavaError occurred while calling > z:org.apache.spark.api.python.PythonRDD.collectAndServe - Stack Overflow > <https://stackoverflow.com/questions/50064646/py4j-protocol-py4jjavaerror-occurred-while-calling-zorg-apache-spark-api-python> > > > HTH > > > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 7 May 2021 at 22:10, Sean Owen <sro...@gmail.com> wrote: > >> I don't see any reason to think this is related to YARN. >> You haven't shown the actual error @rajat so not sure there is >> anything to say. >> >> On Fri, May 7, 2021 at 3:08 PM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> I have suspicion that this may be caused by your cluster as it appears >>> that you are running this in YARN mode like below >>> >>> spark-submit --master yarn --deploy-mode client xyx.py >>> >>> What happens if you try running it in local mode? >>> >>> spark-submit --master local[2] xyx.py >>> >>> Is this run in a managed cluster like GCP dataproc? >>> >>> HTH >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Fri, 7 May 2021 at 19:17, rajat kumar <kumar.rajat20...@gmail.com> >>> wrote: >>> >>>> Thanks Mich and Sean for the response . Yes Sean is right. This is a >>>> batch job. >>>> >>>> I am having only 10 records in the dataframe still it is giving this >>>> exception >>>> >>>> Following are the full logs. >>>> >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line >>>> 584, in foreach >>>> self.rdd.foreach(f) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 789, in >>>> foreach >>>> self.mapPartitions(processPartition).count() # Force evaluation >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, >>>> in count >>>> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, >>>> in sum >>>> return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in >>>> fold >>>> vals = self.mapPartitions(func).collect() >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in >>>> collect >>>> sock_info = >>>> self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) >>>> File >>>> "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line >>>> 1257, in __call__ >>>> answer, self.gateway_client, self.target_id, self.name) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line >>>> 63, in deco >>>> return f(*a, **kw) >>>> File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", >>>> line 328, in get_return_value >>>> format(target_id, ".", name), value) >>>> py4j.protocol.Py4JJavaError: An error occurred while calling >>>> z:org.apache.spark.api.python.PythonRDD.collectAndServe. >>>> : org.apache.spark.SparkException: Job aborted due to stage failure: >>>> Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in >>>> stage 3.0 (TID 10, 10.244.158.5, executor 1): >>>> org.apache.spark.api.python.PythonException: Traceback (most recent call >>>> last): >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, >>>> in main >>>> func, profiler, deserializer, serializer = read_command(pickleSer, >>>> infile) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, >>>> in read_command >>>> command = serializer._read_with_length(file) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line >>>> 172, in _read_with_length >>>> return self.loads(obj) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line >>>> 580, in loads >>>> return pickle.loads(obj, encoding=encoding) >>>> File >>>> "/opt/dataflow/python/lib/python3.6/site-packages/module/read_data.py", >>>> line 10, in <module> >>>> spark = SparkSession.builder.appName("test").getOrCreate() >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line >>>> 173, in getOrCreate >>>> sc = SparkContext.getOrCreate(sparkConf) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line >>>> 367, in getOrCreate >>>> SparkContext(conf=conf or SparkConf()) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line >>>> 133, in __init__ >>>> SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line >>>> 316, in _ensure_initialized >>>> SparkContext._gateway = gateway or launch_gateway(conf) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", >>>> line 46, in launch_gateway >>>> return _launch_gateway(conf) >>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", >>>> line 108, in _launch_gateway >>>> raise Exception("Java gateway process exited before sending its >>>> port number") >>>> Exception: Java gateway process exited before sending its port number >>>> >>>> On Fri, May 7, 2021 at 9:35 PM Sean Owen <sro...@gmail.com> wrote: >>>> >>>>> foreach definitely works :) >>>>> This is not a streaming question. >>>>> The error says that the JVM worker died for some reason. You'd have to >>>>> look at its logs to see why. >>>>> >>>>> On Fri, May 7, 2021 at 11:03 AM Mich Talebzadeh < >>>>> mich.talebza...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am not convinced foreach works even in 3.1.1 >>>>>> Try doing the same with foreachBatch >>>>>> >>>>>> foreachBatch(sendToSink). \ >>>>>> trigger(processingTime='2 seconds'). \ >>>>>> >>>>>> and see it works >>>>>> >>>>>> HTH >>>>>> >>>>>> >>>>>> >>>>>> view my Linkedin profile >>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>>> >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, 7 May 2021 at 16:07, rajat kumar <kumar.rajat20...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> I am using Spark 2.4.4 with Python >>>>>>> >>>>>>> While using below line: >>>>>>> >>>>>>> dataframe.foreach(lambda record : process_logs(record)) >>>>>>> >>>>>>> >>>>>>> My use case is , process logs will download the file from cloud >>>>>>> storage using Python code and then it will save the processed data. >>>>>>> >>>>>>> I am getting the following error >>>>>>> >>>>>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", >>>>>>> line 46, in launch_gateway >>>>>>> return _launch_gateway(conf) >>>>>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", >>>>>>> line 108, in _launch_gateway >>>>>>> raise Exception("Java gateway process exited before sending its >>>>>>> port number") >>>>>>> Exception: Java gateway process exited before sending its port number >>>>>>> >>>>>>> Can anyone pls suggest what can be done? >>>>>>> >>>>>>> Thanks >>>>>>> Rajat >>>>>>> >>>>>>