How many UUIDs do you expect to have in a day? That is likely where all the memory is being used. Does it work without that?
On Tue, Sep 12, 2017 at 8:42 PM, 张万新 <kevinzwx1...@gmail.com> wrote: > *Yes, my code is shown below(I also post my code in another mail)* > /** > * input > */ > val logs = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", BROKER_SERVER) > .option("subscribe", TOPIC) > .option("startingOffset", "latest") > .load() > > /** > * process > */ > val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)] > > val events = logValues > .map(parseFunction) > .select( > $"_1".alias("date").cast("timestamp"), > $"_2".alias("uuid").cast("string") > ) > > val results = events > .withWatermark("date", "1 day") > .dropDuplicates("uuid", "date") > .groupBy($"date") > .count() > > /** > * output > */ > val query = results > .writeStream > .outputMode("update") > .format("console") > .option("truncate", "false") > .trigger(Trigger.ProcessingTime("1 seconds")) > .start() > > query.awaitTermination() > > *and I use play json to parse input logs from kafka ,the parse function is > like* > > def parseFunction(str: String): (Long, String) = { > val json = Json.parse(str) > val timestamp = (json \ "time").get.toString().toLong > val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60 > val uuid = (json \ "uuid").get.toString() > (date, uuid) > } > > and the java heap space is like (I've increase the executor memory to 15g): > > [image: image.png] > Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:23写道: > >> Can you show the full query you are running? >> >> On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <kevinzwx1...@gmail.com> wrote: >> >>> Hi, >>> >>> I'm using structured streaming to count unique visits of our website. I >>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g >>> memory to 4 cores * 10g memory for each executor, but there are frequent >>> full gc, and once the count raises to about more than 4.5 millions the >>> application will be blocked and finally crash in OOM. It's kind of >>> unreasonable. So is there any suggestion to optimize the memory consumption >>> of SS? Thanks. >>> >> >>