[SparkSQL] Count Distinct issue

2018-09-14 Thread Daniele Foroni
Hi all,

I am having some troubles in doing a count distinct over multiple columns.
This is an example of my data:
++++---+
|a   |b   |c   |d  |
++++---+
|null|null|null|1  |
|null|null|null|2  |
|null|null|null|3  |
|null|null|null|4  |
|null|null|null|5  |
|null|null|null|6  |
|null|null|null|7  |
++++---+
And my code:
val df: Dataset[Row] = …
val cols: List[Column] = df.columns.map(col).toList
df.agg(countDistinct(cols.head, cols.tail: _*))

So, in the example above, if I count the distinct “rows” I obtain 7 as result 
as expected (since the “d" column changes for every row).
However, with more columns (16) in EXACTLY the same situation (one incremental 
column and 15 columns filled with nulls) the result is 0.

I don’t understand why I am experiencing this problem.
Any solution?

Thanks,
---
Daniele


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Streaming] Measure latency

2018-06-26 Thread Daniele Foroni
Hi all,

I am using spark streaming and I need to evaluate the latency of the standard 
aggregations (avg, min, max, …) provided by the spark APIs.
Any way to do it in the code?

Thanks in advance,
---
Daniele



Dataset Caching and Unpersisting

2018-05-02 Thread Daniele Foroni
Hi all,

I am having troubles with caching and unpersisting a dataset.
I have a cycle that at each iteration filters my dataset.
I realized that caching every x steps (e.g., 50 steps) gives good performance.

However, after a certain number of caching operations, it seems that the memory 
used for caching is filled, so I think I should have to unpersist the old 
cached dataset.

This is my code:


I tried to use an external variable to cache and unpersist it but it doesn’t 
seem to solve the problem (maybe I used it in the wrong way).
Do you kindly have any suggestion?

Thank you for your support!
---
Daniele