There is expected to be about 5 million UUIDs in a day. I need to use this field to drop duplicate records and count number. If I simply count numbers without using dropDuplicates it only occupies less than 1g memory. I believe most of the memory is occupied by the state store for keeping the state of dropDuplicates. But I cannot find a way to alleviate the problem.
Michael Armbrust <mich...@databricks.com>于2017年9月15日周五 上午3:35写道: > How many UUIDs do you expect to have in a day? That is likely where all > the memory is being used. Does it work without that? > > On Tue, Sep 12, 2017 at 8:42 PM, 张万新 <kevinzwx1...@gmail.com> wrote: > >> *Yes, my code is shown below(I also post my code in another mail)* >> /** >> * input >> */ >> val logs = spark >> .readStream >> .format("kafka") >> .option("kafka.bootstrap.servers", BROKER_SERVER) >> .option("subscribe", TOPIC) >> .option("startingOffset", "latest") >> .load() >> >> /** >> * process >> */ >> val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)] >> >> val events = logValues >> .map(parseFunction) >> .select( >> $"_1".alias("date").cast("timestamp"), >> $"_2".alias("uuid").cast("string") >> ) >> >> val results = events >> .withWatermark("date", "1 day") >> .dropDuplicates("uuid", "date") >> .groupBy($"date") >> .count() >> >> /** >> * output >> */ >> val query = results >> .writeStream >> .outputMode("update") >> .format("console") >> .option("truncate", "false") >> .trigger(Trigger.ProcessingTime("1 seconds")) >> .start() >> >> query.awaitTermination() >> >> *and I use play json to parse input logs from kafka ,the parse function >> is like* >> >> def parseFunction(str: String): (Long, String) = { >> val json = Json.parse(str) >> val timestamp = (json \ "time").get.toString().toLong >> val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 >> val uuid = (json \ "uuid").get.toString() >> (date, uuid) >> } >> >> and the java heap space is like (I've increase the executor memory to >> 15g): >> >> [image: image.png] >> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:23写道: >> >>> Can you show the full query you are running? >>> >>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <kevinzwx1...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I'm using structured streaming to count unique visits of our website. I >>>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g >>>> memory to 4 cores * 10g memory for each executor, but there are frequent >>>> full gc, and once the count raises to about more than 4.5 millions the >>>> application will be blocked and finally crash in OOM. It's kind of >>>> unreasonable. So is there any suggestion to optimize the memory consumption >>>> of SS? Thanks. >>>> >>> >>> >