[ https://issues.apache.org/jira/browse/SPARK-47859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837420#comment-17837420 ]
Leo Timofeyev commented on SPARK-47859: --------------------------------------- This is likely duplicate > Why does javaRDD().mapPartitions lead to the memory leak in this case? > ---------------------------------------------------------------------- > > Key: SPARK-47859 > URL: https://issues.apache.org/jira/browse/SPARK-47859 > Project: Spark > Issue Type: IT Help > Components: Spark Core > Affects Versions: 3.3.2, 3.5.0 > Reporter: Leo Timofeyev > Priority: Major > Attachments: Screenshot 2024-04-15 at 20.43.22.png > > > Hello Spark community. I have an Java Spark Structured Streaming application: > Unless I am doing silly mistake, the JedisCluster closed in the finally > block, but still some memory leak. > {code:java} > FlatMapFunction<Iterator<Row>, Row> myFunction = new > MyFunction(jedisConfiguration); > StructType structSchema = getSchema(); > VoidFunction2<Dataset<Row>, Long> forEachFunc = (dataset, aLong) -> { > Dataset<Row> dataset = getDataset(); > dataset.persist(); > JavaRDD<Row> processedRDD = > dataset.javaRDD().mapPartitions(myFunction); > Dataset<Row> processedDS = > sparkSession().createDataFrame(processedRDD, structSchema); > parquetWriter.write(processedDS); > dataset.unpersist(); > }; > DataStreamWriter<Row> dataStream = dataset > .writeStream() > .foreachBatch(forEachFunc) > .outputMode(outputMode) > .option("checkpointLocation", checkpointLocation); > ....<stream dataStream> {code} > And the function > {code:java} > public class MyFunction implements FlatMapFunction<Iterator<Row>, Row> { > <constructor with jedisConfiguration parameter>... > @Override > public Iterator<Row> call(Iterator<Row> rowIterator) throws Exception { > List<Row> output; > JedisCluster redis = new JedisCluster(jedisConfiguration); > try { > output = new ArrayList<>(); > while (rowIterator.hasNext()) { > Row row = rowIterator.next(); > Long var1 = row.getAs("var1"); > Long var2 = row.getAs("var2"); > var redisKey = "some_key"; > var result = redis.hgetAll(redisKey); > if (!result.isEmpty()) { > output.add(RowFactory.create( > var1, > var2, > result.getOrDefault("some_id", null))); > } > } > } finally { > if (redis != null) { > try { > redis.close(); > } catch (Exception e) { > throw new RuntimeException("Failed to close Redis > connection: " + e); > } > } > } > return output.iterator(); > } > } {code} > It actually works couple of days then dies. Can't figure out what does cause > memory leak in the Driver? > Tested with Spark 3.3.2 and 3.5.0 > Grafana board of the Driver's Memory Pool > !Screenshot 2024-04-15 at 20.43.22.png|width=875,height=169! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org