Re: Structured Streaming: distinct (Spark 2.2)

2018-03-19 Thread Burak Yavuz
I believe the docs are out of date regarding distinct. The behavior should
be as follows:

 - Distinct should be applied across triggers
 - In order to prevent the state from growing indefinitely, you need to add
a watermark
 - If you don't have a watermark, but your key space is small, that's also
fine
 - If a record arrives and is not in the state, it will be outputted
 - If a record arrives and is in the state, it will be ignored
 - Once the watermark passes for a key, it will be dropped from state
 - If a record arrives late, i.e. after the watermark, it will be ignored

HTH!
Burak


On Mon, Mar 19, 2018 at 12:04 PM, Geoff Von Allmen 
wrote:

> I see in the documentation that the distinct operation is not supported
> 
> in Structured Streaming. That being said, I have noticed that you are able
> to successfully call distinct() on a data frame and it seems to perform
> the desired operation and doesn’t fail with the AnalysisException as
> expected. If I call it with a column name specified, then it will fail with
> AnalysisException.
>
> I am using Structured Streaming to read from a Kafka stream and my
> question (and concern) is that:
>
>- The distinct operation is properly applied across the *current*
>batch as read from Kafka, however, the distinct operation would not
>apply across batches.
>
> I have tried the following:
>
>- Started the streaming job to see my baseline data and left the job
>streaming
>- Created events in kafka that would increment my counts if distinct
>was not performing as expected
>- Results:
>   - Distinct still seems to be working over the entire data set even
>   as I add new data.
>   - As I add new data, I see spark process the data (I’m doing output
>   mode = update) but there are no new results indicating the distinct
>   function is in fact still working across batches as spark pulls in the 
> new
>   data from kafka.
>
> Does anyone know more about the intended behavior of distinct in
> Structured Streaming?
>
> If this is working as intended, does this mean I could have a dataset that
> is growing without bound being held in memory/disk or something to that
> effect (so it has some way to make that distinct operation against previous
> data)?
> ​
>


Structured Streaming: distinct (Spark 2.2)

2018-03-19 Thread Geoff Von Allmen
I see in the documentation that the distinct operation is not supported

in Structured Streaming. That being said, I have noticed that you are able
to successfully call distinct() on a data frame and it seems to perform the
desired operation and doesn’t fail with the AnalysisException as expected.
If I call it with a column name specified, then it will fail with
AnalysisException.

I am using Structured Streaming to read from a Kafka stream and my question
(and concern) is that:

   - The distinct operation is properly applied across the *current* batch
   as read from Kafka, however, the distinct operation would not apply
   across batches.

I have tried the following:

   - Started the streaming job to see my baseline data and left the job
   streaming
   - Created events in kafka that would increment my counts if distinct was
   not performing as expected
   - Results:
  - Distinct still seems to be working over the entire data set even as
  I add new data.
  - As I add new data, I see spark process the data (I’m doing output
  mode = update) but there are no new results indicating the distinct
  function is in fact still working across batches as spark pulls
in the new
  data from kafka.

Does anyone know more about the intended behavior of distinct in Structured
Streaming?

If this is working as intended, does this mean I could have a dataset that
is growing without bound being held in memory/disk or something to that
effect (so it has some way to make that distinct operation against previous
data)?
​