[ 
https://issues.apache.org/jira/browse/SPARK-47859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837420#comment-17837420
 ] 

Leo Timofeyev commented on SPARK-47859:
---------------------------------------

This is likely duplicate

> Why does javaRDD().mapPartitions lead to the memory leak in this case?
> ----------------------------------------------------------------------
>
>                 Key: SPARK-47859
>                 URL: https://issues.apache.org/jira/browse/SPARK-47859
>             Project: Spark
>          Issue Type: IT Help
>          Components: Spark Core
>    Affects Versions: 3.3.2, 3.5.0
>            Reporter: Leo Timofeyev
>            Priority: Major
>         Attachments: Screenshot 2024-04-15 at 20.43.22.png
>
>
> Hello Spark community. I have an Java Spark Structured Streaming application:
> Unless I am doing silly mistake, the JedisCluster closed in the finally 
> block, but still some memory leak. 
> {code:java}
> FlatMapFunction<Iterator<Row>, Row> myFunction = new 
> MyFunction(jedisConfiguration);
> StructType  structSchema = getSchema();
> VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
>         Dataset<Row> dataset = getDataset();
>         dataset.persist();
>         JavaRDD<Row> processedRDD = 
> dataset.javaRDD().mapPartitions(myFunction);
>         Dataset<Row> processedDS = 
> sparkSession().createDataFrame(processedRDD, structSchema);
>         parquetWriter.write(processedDS);
>         dataset.unpersist();
>     };
> DataStreamWriter<Row> dataStream = dataset
> .writeStream()
> .foreachBatch(forEachFunc)
> .outputMode(outputMode)
> .option("checkpointLocation", checkpointLocation);
> ....<stream dataStream> {code}
> And the function
> {code:java}
> public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {
>    <constructor with jedisConfiguration parameter>...
>     @Override
>     public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {
>         List<Row> output;
>         JedisCluster redis = new JedisCluster(jedisConfiguration);
>         try {
>             output = new ArrayList<>();
>             while (rowIterator.hasNext()) {
>                 Row row = rowIterator.next();
>                 Long var1 = row.getAs("var1");
>                 Long var2 = row.getAs("var2");
>                 var redisKey = "some_key";
>                 var result = redis.hgetAll(redisKey);
>                 if (!result.isEmpty()) {
>                     output.add(RowFactory.create(
>                             var1,
>                             var2,
>                             result.getOrDefault("some_id", null)));
>                 }
>             }
>         } finally {
>             if (redis != null) {
>                 try {
>                     redis.close();
>                 } catch (Exception e) {
>                     throw new RuntimeException("Failed to close Redis 
> connection: " + e);
>                 }
>             }
>         }
>         return output.iterator();
>     }
> } {code}
> It actually works couple of days then dies. Can't figure out what does cause 
> memory leak in the  Driver?
> Tested with Spark 3.3.2 and 3.5.0
> Grafana board of the Driver's Memory Pool 
> !Screenshot 2024-04-15 at 20.43.22.png|width=875,height=169!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to