How many UUIDs do you expect to have in a day?  That is likely where all
the memory is being used.  Does it work without that?

On Tue, Sep 12, 2017 at 8:42 PM, 张万新 <kevinzwx1...@gmail.com> wrote:

> *Yes, my code is shown below(I also post my code in another mail)*
> /**
>     * input
>     */
>   val logs = spark
>     .readStream
>     .format("kafka")
>     .option("kafka.bootstrap.servers", BROKER_SERVER)
>     .option("subscribe", TOPIC)
>     .option("startingOffset", "latest")
>     .load()
>
>   /**
>     * process
>     */
>   val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]
>
>   val events = logValues
>     .map(parseFunction)
>     .select(
>       $"_1".alias("date").cast("timestamp"),
>       $"_2".alias("uuid").cast("string")
>     )
>
>   val results = events
>     .withWatermark("date", "1 day")
>     .dropDuplicates("uuid", "date")
>     .groupBy($"date")
>     .count()
>
>   /**
>     * output
>     */
>   val query = results
>     .writeStream
>     .outputMode("update")
>     .format("console")
>     .option("truncate", "false")
>     .trigger(Trigger.ProcessingTime("1 seconds"))
>     .start()
>
>   query.awaitTermination()
>
> *and I use play json to parse input logs from kafka ,the parse function is
> like*
>
>   def parseFunction(str: String): (Long, String) = {
>     val json = Json.parse(str)
>     val timestamp = (json \ "time").get.toString().toLong
>     val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
>     val uuid = (json \ "uuid").get.toString()
>     (date, uuid)
>   }
>
> and the java heap space is like (I've increase the executor memory to 15g):
>
> [image: image.png]
> Michael Armbrust <mich...@databricks.com>于2017年9月13日周三 上午2:23写道:
>
>> Can you show the full query you are running?
>>
>> On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <kevinzwx1...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm using structured streaming to count unique visits of our website. I
>>> use spark on yarn mode with 4 executor instances and from 2 cores * 5g
>>> memory to 4 cores * 10g memory for each executor, but there are frequent
>>> full gc, and once the count raises to about more than 4.5 millions the
>>> application will be blocked and finally crash in OOM. It's kind of
>>> unreasonable. So is there any suggestion to optimize the memory consumption
>>> of SS? Thanks.
>>>
>>
>>

Reply via email to