[ 
https://issues.apache.org/jira/browse/SPARK-47859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leo Timofeyev updated SPARK-47859:
----------------------------------
    Description: 
Hello Spark community. I have an Java Spark Structured Streaming application:
Unless I am doing silly mistake, the JedisCluster closed in the finally block, 
but still some memory leak. 
{code:java}
FlatMapFunction<Iterator<Row>, Row> myFunction = new 
MyFunction(jedisConfiguration);
StructType  structSchema = getSchema();

VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
        Dataset<Row> dataset = getDataset();
        dataset.persist();
        JavaRDD<Row> processedRDD = dataset.javaRDD().mapPartitions(myFunction);
        Dataset<Row> processedDS = sparkSession().createDataFrame(processedRDD, 
structSchema);
        parquetWriter.write(processedDS);
        dataset.unpersist();
    };

DataStreamWriter<Row> dataStream = dataset
.writeStream()
.foreachBatch(forEachFunc)
.outputMode(outputMode)
.option("checkpointLocation", checkpointLocation);

....<stream dataStream> {code}
And the function
{code:java}
public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {

   <constructor with jedisConfiguration parameter>...

    @Override
    public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {

        List<Row> output;
        JedisCluster redis = new JedisCluster(jedisConfiguration);

        try {
            output = new ArrayList<>();

            while (rowIterator.hasNext()) {
                Row row = rowIterator.next();
                Long var1 = row.getAs("var1");
                Long var2 = row.getAs("var2");

                var redisKey = "some_key";
                var result = redis.hgetAll(redisKey);

                if (!result.isEmpty()) {
                    output.add(RowFactory.create(
                            var1,
                            var2,
                            result.getOrDefault("some_id", null)));
                }
            }
        } finally {
            if (redis != null) {
                try {
                    redis.close();
                } catch (Exception e) {
                    throw new RuntimeException("Failed to close Redis 
connection: " + e);
                }
            }
        }
        return output.iterator();
    }
} {code}
It actually works couple of days then dies. Can't figure out what does cause 
memory leak in the  Driver?

Tested with Spark 3.3.2 and 3.5.0

Grafana board of the Driver's Memory Pool 
!Screenshot 2024-04-15 at 20.43.22.png|width=875,height=169!

  was:
Hello Spark community. I have an Java Spark Structured Streaming application:
Unless I am doing silly mistake, the JedisCluster closed in the finally block, 
but still some memory leak. 
{code:java}
FlatMapFunction<Iterator<Row>, Row> myFunction = new 
MyFunction(jedisConfiguration);
StructType  structSchema = getSchema();

VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
        Dataset<Row> dataset = getDataset();
        dataset.persist();
        JavaRDD<Row> processedRDD = dataset.javaRDD().mapPartitions(myFunction);
        Dataset<Row> processedDS = sparkSession().createDataFrame(processedRDD, 
structSchema);
        parquetWriter.write(processedDS);
        dataset.unpersist();
    };

DataStreamWriter<Row> dataStream = dataset
.writeStream()
.foreachBatch(forEachFunc)
.outputMode(outputMode)
.option("checkpointLocation", checkpointLocation);

....<stream dataStream> {code}
And function
{code:java}
public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {

   <constructor with jedisConfiguration parameter>...

    @Override
    public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {

        List<Row> output;
        JedisCluster redis = new JedisCluster(jedisConfiguration);

        try {
            output = new ArrayList<>();

            while (rowIterator.hasNext()) {
                Row row = rowIterator.next();
                Long var1 = row.getAs("var1");
                Long var2 = row.getAs("var2");

                var redisKey = "some_key";
                var result = redis.hgetAll(redisKey);

                if (!result.isEmpty()) {
                    output.add(RowFactory.create(
                            var1,
                            var2,
                            result.getOrDefault("some_id", null)));
                }
            }
        } finally {
            if (redis != null) {
                try {
                    redis.close();
                } catch (Exception e) {
                    throw new RuntimeException("Failed to close Redis 
connection: " + e);
                }
            }
        }
        return output.iterator();
    }
} {code}
It actually works couple of days then dies. Can't figure out what does cause 
memory leak in the  Driver?

Tested with Spark 3.3.2 and 3.5.0

Grafana board of the Driver's Memory Pool 
!Screenshot 2024-04-15 at 20.43.22.png|width=875,height=169!


> Why does this lead to the memory leak?
> --------------------------------------
>
>                 Key: SPARK-47859
>                 URL: https://issues.apache.org/jira/browse/SPARK-47859
>             Project: Spark
>          Issue Type: IT Help
>          Components: Spark Core
>    Affects Versions: 3.3.2, 3.5.0
>            Reporter: Leo Timofeyev
>            Priority: Major
>         Attachments: Screenshot 2024-04-15 at 20.43.22.png
>
>
> Hello Spark community. I have an Java Spark Structured Streaming application:
> Unless I am doing silly mistake, the JedisCluster closed in the finally 
> block, but still some memory leak. 
> {code:java}
> FlatMapFunction<Iterator<Row>, Row> myFunction = new 
> MyFunction(jedisConfiguration);
> StructType  structSchema = getSchema();
> VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
>         Dataset<Row> dataset = getDataset();
>         dataset.persist();
>         JavaRDD<Row> processedRDD = 
> dataset.javaRDD().mapPartitions(myFunction);
>         Dataset<Row> processedDS = 
> sparkSession().createDataFrame(processedRDD, structSchema);
>         parquetWriter.write(processedDS);
>         dataset.unpersist();
>     };
> DataStreamWriter<Row> dataStream = dataset
> .writeStream()
> .foreachBatch(forEachFunc)
> .outputMode(outputMode)
> .option("checkpointLocation", checkpointLocation);
> ....<stream dataStream> {code}
> And the function
> {code:java}
> public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {
>    <constructor with jedisConfiguration parameter>...
>     @Override
>     public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {
>         List<Row> output;
>         JedisCluster redis = new JedisCluster(jedisConfiguration);
>         try {
>             output = new ArrayList<>();
>             while (rowIterator.hasNext()) {
>                 Row row = rowIterator.next();
>                 Long var1 = row.getAs("var1");
>                 Long var2 = row.getAs("var2");
>                 var redisKey = "some_key";
>                 var result = redis.hgetAll(redisKey);
>                 if (!result.isEmpty()) {
>                     output.add(RowFactory.create(
>                             var1,
>                             var2,
>                             result.getOrDefault("some_id", null)));
>                 }
>             }
>         } finally {
>             if (redis != null) {
>                 try {
>                     redis.close();
>                 } catch (Exception e) {
>                     throw new RuntimeException("Failed to close Redis 
> connection: " + e);
>                 }
>             }
>         }
>         return output.iterator();
>     }
> } {code}
> It actually works couple of days then dies. Can't figure out what does cause 
> memory leak in the  Driver?
> Tested with Spark 3.3.2 and 3.5.0
> Grafana board of the Driver's Memory Pool 
> !Screenshot 2024-04-15 at 20.43.22.png|width=875,height=169!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to