Leo Timofeyev created SPARK-47859: ------------------------------------- Summary: Why does this lead to the memory leak? Key: SPARK-47859 URL: https://issues.apache.org/jira/browse/SPARK-47859 Project: Spark Issue Type: IT Help Components: Spark Core Affects Versions: 3.5.0, 3.3.2 Reporter: Leo Timofeyev Attachments: Screenshot 2024-04-15 at 20.43.22.png
Hello Spark community. I have an Java Spark Structured Streaming application: JedisCluster closed in finally block, but still some memory leak. {code:java} FlatMapFunction<Iterator<Row>, Row> myFunction = new MyFunction(jedisConfiguration); StructType structSchema = getSchema(); VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> { Dataset<Row> dataset = getDataset(); dataset.persist(); JavaRDD<Row> processedRDD = dataset.javaRDD().mapPartitions(myFunction); Dataset<Row> processedDS = sparkSession().createDataFrame(processedRDD, structSchema); parquetWriter.write(processedDS); dataset.unpersist(); }; DataStreamWriter<Row> dataStream = dataset .writeStream() .foreachBatch(forEachFunc) .outputMode(outputMode) .option("checkpointLocation", checkpointLocation); ....<stream dataStream> {code} And function {code:java} public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> { <constructor with jedisConfiguration parameter>... @Override public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception { List<Row> output; JedisCluster redis = new JedisCluster(jedisConfiguration); try { output = new ArrayList<>(); while (rowIterator.hasNext()) { Row row = rowIterator.next(); Long var1 = row.getAs("var1"); Long var2 = row.getAs("var2"); var redisKey = "some_key"; var result = redis.hgetAll(redisKey); if (!result.isEmpty()) { output.add(RowFactory.create( var1, var2, result.getOrDefault("some_id", null))); } } } finally { if (redis != null) { try { redis.close(); } catch (Exception e) { throw new RuntimeException("Failed to close Redis connection: " + e); } } } return output.iterator(); } } {code} It actually works couple of days then dies. Can't figure out what does cause memory leak in the Driver? -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org