Leo Timofeyev created SPARK-47859:
-------------------------------------

             Summary: Why does this lead to the memory leak?
                 Key: SPARK-47859
                 URL: https://issues.apache.org/jira/browse/SPARK-47859
             Project: Spark
          Issue Type: IT Help
          Components: Spark Core
    Affects Versions: 3.5.0, 3.3.2
            Reporter: Leo Timofeyev
         Attachments: Screenshot 2024-04-15 at 20.43.22.png

Hello Spark community. I have an Java Spark Structured Streaming application:
JedisCluster closed in finally block, but still some memory leak. 


{code:java}
FlatMapFunction<Iterator<Row>, Row> myFunction = new 
MyFunction(jedisConfiguration);
StructType  structSchema = getSchema();

VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> {
        Dataset<Row> dataset = getDataset();
        dataset.persist();
        JavaRDD<Row> processedRDD = dataset.javaRDD().mapPartitions(myFunction);
        Dataset<Row> processedDS = sparkSession().createDataFrame(processedRDD, 
structSchema);
        parquetWriter.write(processedDS);
        dataset.unpersist();
    };

DataStreamWriter<Row> dataStream = dataset
.writeStream()
.foreachBatch(forEachFunc)
.outputMode(outputMode)
.option("checkpointLocation", checkpointLocation);

....<stream dataStream> {code}

And function

{code:java}
public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> {

   <constructor with jedisConfiguration parameter>...

    @Override
    public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception {

        List<Row> output;
        JedisCluster redis = new JedisCluster(jedisConfiguration);

        try {
            output = new ArrayList<>();

            while (rowIterator.hasNext()) {
                Row row = rowIterator.next();
                Long var1 = row.getAs("var1");
                Long var2 = row.getAs("var2");

                var redisKey = "some_key";
                var result = redis.hgetAll(redisKey);

                if (!result.isEmpty()) {
                    output.add(RowFactory.create(
                            var1,
                            var2,
                            result.getOrDefault("some_id", null)));
                }
            }
        } finally {
            if (redis != null) {
                try {
                    redis.close();
                } catch (Exception e) {
                    throw new RuntimeException("Failed to close Redis 
connection: " + e);
                }
            }
        }
        return output.iterator();
    }
} {code}
It actually works couple of days then dies. Can't figure out what does cause 
memory leak in the  Driver?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to