Streaming : WAL ignored

2018-05-24 Thread Walid Lezzar
Hi,

I have a spark streaming application running on yarn that consumes from a jms 
source. I have the checkpointing and WAL enabled to ensure zero data loss. 
However, When I suddenly kill my application and restarts it, sometimes it 
recovers the data from the WAL but sometimes it doesn’t !! In all the cases, I 
can see the WAL written correctly on HDFS. 

Can someone explains me why my WAL is sometimes ignored on restart ? What are 
the conditions for spark to decide to recover or not from the WAL ?

Thanks,
Walid.
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to read the schema of a partitioned dataframe without listing all the partitions ?

2018-04-27 Thread Walid Lezzar
I’m using spark 2.3 with schema merge set to false. I don’t think spark is 
reading any file indeed but it tries to list them all one by one and it’s super 
slow on s3 ! 

Pointing to a single partition manually is not an option as it requires me to 
be aware of the partitioning in order to add it to the path and also, spark 
doesn’t include the partitioning column in that case.

> Le 27 avr. 2018 à 16:07, Yong Zhang <java8...@hotmail.com> a écrit :
> 
> What version of Spark you are using?
> 
> You can search "spark.sql.parquet.mergeSchema" on 
> https://spark.apache.org/docs/latest/sql-programming-guide.html
> 
> Starting from Spark 1.5, the default is already "false", which means Spark 
> shouldn't scan all the parquet files to generate the schema.
> 
> Yong
> Spark SQL and DataFrames - Spark 2.3.0 Documentation
> spark.apache.org
> Global Temporary View. Temporary views in Spark SQL are session-scoped and 
> will disappear if the session that creates it terminates. If you want to have 
> a temporary view that is shared among all sessions and keep alive until the 
> Spark application terminates, you can create a global temporary view.
> 
> 
> 
> From: Walid LEZZAR <walez...@gmail.com>
> Sent: Friday, April 27, 2018 7:42 AM
> To: spark users
> Subject: How to read the schema of a partitioned dataframe without listing 
> all the partitions ?
>  
> Hi,
> 
> I have a parquet on S3 partitioned by day. I have 2 years of data (-> about 
> 1000 partitions). With spark, when I just want to know the schema of this 
> parquet without even asking for a single row of data, spark tries to list all 
> the partitions and the nested partitions of the parquet. Which makes it very 
> slow just to build the dataframe object on Zeppelin.
> 
> Is there a way to avoid that ? Is there way to tell spark : "hey, just read a 
> single partition and give me the schema of that partition and consider it as 
> the schema of the whole dataframe" ? (I don't care about schema merge, it's 
> off by the way)
> 
> Thanks.
> Walid.


How to read the schema of a partitioned dataframe without listing all the partitions ?

2018-04-27 Thread Walid LEZZAR
Hi,

I have a parquet on S3 partitioned by day. I have 2 years of data (-> about
1000 partitions). With spark, when I just want to know the schema of this
parquet without even asking for a single row of data, spark tries to list
all the partitions and the nested partitions of the parquet. Which makes it
very slow just to build the dataframe object on Zeppelin.

Is there a way to avoid that ? Is there way to tell spark : "hey, just read
a single partition and give me the schema of that partition and consider it
as the schema of the whole dataframe" ? (I don't care about schema merge,
it's off by the way)

Thanks.
Walid.


Re: Spark Metrics : Why is the Sink class declared private[spark] ?

2016-04-02 Thread Walid Lezzar
This is great ! Hope this jira will be resolved for the next version of spark

Thanks.

> Le 2 avr. 2016 à 01:07, Saisai Shao <sai.sai.s...@gmail.com> a écrit :
> 
> There's a JIRA (https://issues.apache.org/jira/browse/SPARK-14151) about it, 
> please take a look.
> 
> Thanks
> Saisai
> 
>> On Sat, Apr 2, 2016 at 6:48 AM, Walid Lezzar <walez...@gmail.com> wrote:
>> Hi,
>> 
>> I looked into the spark code at how spark report metrics using the 
>> MetricsSystem class. I've seen that the spark MetricsSystem class when 
>> instantiated parses the metrics.properties file, tries to find the sinks 
>> class name and load them dinamically. It would be great to implement my own 
>> sink by inheriting from the org.apache.spark.metrics.sinks.Sink class but 
>> unfortunately, this class has been declared private[spark] ! So it is not 
>> possible to inverit from it ! Why is that ? Is this gonna change in future 
>> spark versions ?
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Spark Metrics : Why is the Sink class declared private[spark] ?

2016-04-01 Thread Walid Lezzar
Hi,

I looked into the spark code at how spark report metrics using the 
MetricsSystem class. I've seen that the spark MetricsSystem class when 
instantiated parses the metrics.properties file, tries to find the sinks class 
name and load them dinamically. It would be great to implement my own sink by 
inheriting from the org.apache.spark.metrics.sinks.Sink class but 
unfortunately, this class has been declared private[spark] ! So it is not 
possible to inverit from it ! Why is that ? Is this gonna change in future 
spark versions ? 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark 1.6] Spark Streaming - java.lang.AbstractMethodError

2016-01-07 Thread Walid LEZZAR
Hi,

We have been using spark streaming for a little while now.

Until now, we were running our spark streaming jobs in spark 1.5.1 and it
was working well. Yesterday, we upgraded to spark 1.6.0 without any changes
in the code. But our streaming jobs are not working any more. We are
getting an "AbstractMethodError". Please, find the stack trace at the end
of the mail. Can we have some hints on what this error means ? (we are
using spark to connect to kafka)

The stack trace :
16/01/07 10:44:39 INFO ZkState: Starting curator service
16/01/07 10:44:39 INFO CuratorFrameworkImpl: Starting
16/01/07 10:44:39 INFO ZooKeeper: Initiating client connection,
connectString=localhost:2181 sessionTimeout=12
watcher=org.apache.curator.ConnectionState@2e9fa23a
16/01/07 10:44:39 INFO ClientCnxn: Opening socket connection to server
localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL
(unknown error)
16/01/07 10:44:39 INFO ClientCnxn: Socket connection established to
localhost/127.0.0.1:2181, initiating session
16/01/07 10:44:39 INFO ClientCnxn: Session establishment complete on server
localhost/127.0.0.1:2181, sessionid = 0x1521b6d262e0035, negotiated timeout
= 6
16/01/07 10:44:39 INFO ConnectionStateManager: State change: CONNECTED
16/01/07 10:44:40 INFO PartitionManager: Read partition information from:
/spark-kafka-consumer/StreamingArchiver/lbc.job.multiposting.input/partition_0
--> null
16/01/07 10:44:40 INFO JobScheduler: Added jobs for time 145215988 ms
16/01/07 10:44:40 INFO JobScheduler: Starting job streaming job
145215988 ms.0 from job set of time 145215988 ms
16/01/07 10:44:40 ERROR Utils: uncaught error in thread
StreamingListenerBus, stopping SparkContext

ERROR Utils: uncaught error in thread StreamingListenerBus, stopping
SparkContext
java.lang.AbstractMethodError
at
org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:47)
at
org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:26)
at
org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
at
org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
at
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
16/01/07 10:44:40 INFO JobScheduler: Finished job streaming job
145215988 ms.0 from job set of time 145215988 ms
16/01/07 10:44:40 INFO JobScheduler: Total delay: 0.074 s for time
145215988 ms (execution: 0.032 s)
16/01/07 10:44:40 ERROR JobScheduler: Error running job streaming job
145215988 ms.0
java.lang.IllegalStateException: SparkContext has been shutdown
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225)
at
org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46)
at
fr.leboncoin.morpheus.jobs.streaming.StreamingArchiver.lambda$run$ade930b4$1(StreamingArchiver.java:103)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at