Thanks Mich and Sean for the response . Yes Sean is right. This is a batch
job.

  I am having only 10 records in the dataframe still it is giving this
exception

Following are the full logs.

File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line
584, in foreach
    self.rdd.foreach(f)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 789, in
foreach
    self.mapPartitions(processPartition).count()  # Force evaluation
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in
count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
    vals = self.mapPartitions(func).collect()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in
collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63,
in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage
3.0 (TID 10, 10.244.158.5, executor 1):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in
main
    func, profiler, deserializer, serializer = read_command(pickleSer,
infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in
read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
580, in loads
    return pickle.loads(obj, encoding=encoding)
  File
"/opt/dataflow/python/lib/python3.6/site-packages/module/read_data.py",
line 10, in <module>
    spark = SparkSession.builder.appName("test").getOrCreate()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line
173, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in
getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 133, in
__init__
    SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316, in
_ensure_initialized
    SparkContext._gateway = gateway or launch_gateway(conf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
46, in launch_gateway
    return _launch_gateway(conf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
108, in _launch_gateway
    raise Exception("Java gateway process exited before sending its port
number")
Exception: Java gateway process exited before sending its port number

On Fri, May 7, 2021 at 9:35 PM Sean Owen <sro...@gmail.com> wrote:

> foreach definitely works :)
> This is not a streaming question.
> The error says that the JVM worker died for some reason. You'd have to
> look at its logs to see why.
>
> On Fri, May 7, 2021 at 11:03 AM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am not convinced foreach works even in 3.1.1
>> Try doing the same with foreachBatch
>>
>>                      foreachBatch(sendToSink). \
>>                     trigger(processingTime='2 seconds'). \
>>
>> and see it works
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 7 May 2021 at 16:07, rajat kumar <kumar.rajat20...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> I am using Spark 2.4.4 with Python
>>>
>>> While using below line:
>>>
>>> dataframe.foreach(lambda record : process_logs(record))
>>>
>>>
>>> My use case is , process logs will download the file from cloud storage
>>> using Python code and then it will save the processed data.
>>>
>>> I am getting the following error
>>>
>>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
>>> 46, in launch_gateway
>>>     return _launch_gateway(conf)
>>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
>>> 108, in _launch_gateway
>>>     raise Exception("Java gateway process exited before sending its port
>>> number")
>>> Exception: Java gateway process exited before sending its port number
>>>
>>> Can anyone pls suggest what can be done?
>>>
>>> Thanks
>>> Rajat
>>>
>>

Reply via email to