Thanks Mich and Sean for the response . Yes Sean is right. This is a batch job.
I am having only 10 records in the dataframe still it is giving this exception Following are the full logs. File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 584, in foreach self.rdd.foreach(f) File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 789, in foreach self.mapPartitions(processPartition).count() # Force evaluation File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold vals = self.mapPartitions(func).collect() File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage 3.0 (TID 10, 10.244.158.5, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command command = serializer._read_with_length(file) File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in loads return pickle.loads(obj, encoding=encoding) File "/opt/dataflow/python/lib/python3.6/site-packages/module/read_data.py", line 10, in <module> spark = SparkSession.builder.appName("test").getOrCreate() File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate sc = SparkContext.getOrCreate(sparkConf) File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in getOrCreate SparkContext(conf=conf or SparkConf()) File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 133, in __init__ SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316, in _ensure_initialized SparkContext._gateway = gateway or launch_gateway(conf) File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line 46, in launch_gateway return _launch_gateway(conf) File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line 108, in _launch_gateway raise Exception("Java gateway process exited before sending its port number") Exception: Java gateway process exited before sending its port number On Fri, May 7, 2021 at 9:35 PM Sean Owen <sro...@gmail.com> wrote: > foreach definitely works :) > This is not a streaming question. > The error says that the JVM worker died for some reason. You'd have to > look at its logs to see why. > > On Fri, May 7, 2021 at 11:03 AM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Hi, >> >> I am not convinced foreach works even in 3.1.1 >> Try doing the same with foreachBatch >> >> foreachBatch(sendToSink). \ >> trigger(processingTime='2 seconds'). \ >> >> and see it works >> >> HTH >> >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Fri, 7 May 2021 at 16:07, rajat kumar <kumar.rajat20...@gmail.com> >> wrote: >> >>> Hi Team, >>> >>> I am using Spark 2.4.4 with Python >>> >>> While using below line: >>> >>> dataframe.foreach(lambda record : process_logs(record)) >>> >>> >>> My use case is , process logs will download the file from cloud storage >>> using Python code and then it will save the processed data. >>> >>> I am getting the following error >>> >>> File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line >>> 46, in launch_gateway >>> return _launch_gateway(conf) >>> File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line >>> 108, in _launch_gateway >>> raise Exception("Java gateway process exited before sending its port >>> number") >>> Exception: Java gateway process exited before sending its port number >>> >>> Can anyone pls suggest what can be done? >>> >>> Thanks >>> Rajat >>> >>