[Structured Streaming] Robust watermarking calculation with future timestamps

2019-11-13 Thread Anastasios Zouzias
Hi all,

We currently have the following issue with a Spark Structured Streaming
(SS) application. The application reads messages from thousands of source
systems, stores them in Kafka and Spark aggregates them using SS and
watermarking (15 minutes).

The root problem is that a few of the source systems have a wrong timezone
setup that makes them emit messages from the future, i.e., +1 hour ahead of
current time (mis-configuration or winter/summer timezone change (yeah!) ).
Since watermarking is calculated as

(most latest timestamp value of all messages) - (watermarking threshold
value, 15 mins),

most of the messages are dropped due to the fact that are delayed by more
than 45 minutes. To an even more extreme scenario, even a single "future" /
adversarial message can make the structured streaming application to report
zero messages (per mini-batch).

Is there any user exposed SS API that allows a more robust calculation of
watermarking, i.e., 95th percentile of timestamps instead of max timestamp?
I understand that such calculation will be more expensive, but it will make
the application more robust.

Any suggestions/ideas?

PS. Of course the best approach would be to fix the issue on all source
systems but this might take time to do so (or perhaps drop future messages
programmatically (yikes) ).

Best regards,
Anastasios


error , saving dataframe , LEGACY_PASS_PARTITION_BY_AS_OPTIONS

2019-11-13 Thread asma zgolli
Hello ,


I'm using spark 2.4.4 and i keep receiving this error message. Can you
please help me identify the problem?


thank you ,

yours sincerely
Asma ZGOLLI

PhD student in data engineering - computer science

PJ:



"main" java.lang.NoSuchMethodError:
org.apache.spark.sql.internal.SQLConf$.LEGACY_PASS_PARTITION_BY_AS_OPTIONS()Lorg/apache/spark/internal/config/ConfigEntry;

at org.apache.spark.sql.DataFrameWriter.saveToV1Source(
DataFrameWriter.scala:277)


Re: error , saving dataframe , LEGACY_PASS_PARTITION_BY_AS_OPTIONS

2019-11-13 Thread Femi Anthony
Can you post the line of code that’s resulting in that error along with the 
stack trace ?

Sent from my iPhone

> On Nov 13, 2019, at 9:53 AM, asma zgolli  wrote:
> 
> 
> Hello , 
> 
> I'm using spark 2.4.4 and i keep receiving this error message. Can you please 
> help me identify the problem?
> 
> thank you , 
> yours sincerely
> Asma ZGOLLI
> 
> PhD student in data engineering - computer science
> PJ:
> 
> 
> "main" java.lang.NoSuchMethodError: 
> org.apache.spark.sql.internal.SQLConf$.LEGACY_PASS_PARTITION_BY_AS_OPTIONS()Lorg/apache/spark/internal/config/ConfigEntry;
>   at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:277)
> 
> 
> 
> 
> 


Re: error , saving dataframe , LEGACY_PASS_PARTITION_BY_AS_OPTIONS

2019-11-13 Thread asma zgolli
the line of code that resulted in that error was :

ch.cern.sparkmeasure.StageMetrics.saveData


it also was started while trying to save a dataframe in hbase using hbase
spark connector :

df1.write.options(

  Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.
newTable -> "4"))

  .format("org.apache.hadoop.hbase.spark")

  .save()


the full stack trace is as follows:



Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.sql.internal.SQLConf$.LEGACY_PASS_PARTITION_BY_AS_OPTIONS()Lorg/apache/spark/internal/config/ConfigEntry;

at org.apache.spark.sql.DataFrameWriter.saveToV1Source(
DataFrameWriter.scala:277)

at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)

at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)

at ch.cern.sparkmeasure.StageMetrics.saveData(stagemetrics.scala:297)

...

...

at scala.Function0$class.apply$mcV$sp(Function0.scala:34)

at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

at scala.App$$anonfun$main$1.apply(App.scala:76)

at scala.App$$anonfun$main$1.apply(App.scala:76)

at scala.collection.immutable.List.foreach(List.scala:381)

at scala.collection.generic.TraversableForwarder$class.foreach(
TraversableForwarder.scala:35)

at scala.App$class.main(App.scala:76)

Le mer. 13 nov. 2019 à 16:05, Femi Anthony  a écrit :

> Can you post the line of code that’s resulting in that error along with
> the stack trace ?
>
> Sent from my iPhone
>
> On Nov 13, 2019, at 9:53 AM, asma zgolli  wrote:
>
> 
>
> Hello ,
>
>
> I'm using spark 2.4.4 and i keep receiving this error message. Can you
> please help me identify the problem?
>
>
> thank you ,
>
> yours sincerely
> Asma ZGOLLI
>
> PhD student in data engineering - computer science
>
> PJ:
>
>
>
> "main" java.lang.NoSuchMethodError:
> org.apache.spark.sql.internal.SQLConf$.LEGACY_PASS_PARTITION_BY_AS_OPTIONS()Lorg/apache/spark/internal/config/ConfigEntry;
>
> at org.apache.spark.sql.DataFrameWriter.saveToV1Source(
> DataFrameWriter.scala:277)
>
>
>
>
>
>

-- 
Asma ZGOLLI

PhD student in data engineering - computer science


Re: error , saving dataframe , LEGACY_PASS_PARTITION_BY_AS_OPTIONS

2019-11-13 Thread Russell Spitzer
My guess would be this is a Spark Version mismatch. The option is added

https://github.com/apache/spark/commit/df9a50637e2622a15e9af7d837986a0e868878b1

https://issues.apache.org/jira/browse/SPARK-27453

In 2.4.2

I would make sure your Spark installs are all 2.4.4 and that your code is
compiled against 2.4.4 spark libs. For example if one node had 2.4.1 or
2.4.0 you would end up with an error.

On Wed, Nov 13, 2019 at 9:05 AM Femi Anthony  wrote:

> Can you post the line of code that’s resulting in that error along with
> the stack trace ?
>
> Sent from my iPhone
>
> On Nov 13, 2019, at 9:53 AM, asma zgolli  wrote:
>
> 
>
> Hello ,
>
>
> I'm using spark 2.4.4 and i keep receiving this error message. Can you
> please help me identify the problem?
>
>
> thank you ,
>
> yours sincerely
> Asma ZGOLLI
>
> PhD student in data engineering - computer science
>
> PJ:
>
>
>
> "main" java.lang.NoSuchMethodError:
> org.apache.spark.sql.internal.SQLConf$.LEGACY_PASS_PARTITION_BY_AS_OPTIONS()Lorg/apache/spark/internal/config/ConfigEntry;
>
> at org.apache.spark.sql.DataFrameWriter.saveToV1Source(
> DataFrameWriter.scala:277)
>
>
>
>
>
>