Streaming : WAL ignored
Hi, I have a spark streaming application running on yarn that consumes from a jms source. I have the checkpointing and WAL enabled to ensure zero data loss. However, When I suddenly kill my application and restarts it, sometimes it recovers the data from the WAL but sometimes it doesn’t !! In all the cases, I can see the WAL written correctly on HDFS. Can someone explains me why my WAL is sometimes ignored on restart ? What are the conditions for spark to decide to recover or not from the WAL ? Thanks, Walid. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How to read the schema of a partitioned dataframe without listing all the partitions ?
I’m using spark 2.3 with schema merge set to false. I don’t think spark is reading any file indeed but it tries to list them all one by one and it’s super slow on s3 ! Pointing to a single partition manually is not an option as it requires me to be aware of the partitioning in order to add it to the path and also, spark doesn’t include the partitioning column in that case. > Le 27 avr. 2018 à 16:07, Yong Zhang <java8...@hotmail.com> a écrit : > > What version of Spark you are using? > > You can search "spark.sql.parquet.mergeSchema" on > https://spark.apache.org/docs/latest/sql-programming-guide.html > > Starting from Spark 1.5, the default is already "false", which means Spark > shouldn't scan all the parquet files to generate the schema. > > Yong > Spark SQL and DataFrames - Spark 2.3.0 Documentation > spark.apache.org > Global Temporary View. Temporary views in Spark SQL are session-scoped and > will disappear if the session that creates it terminates. If you want to have > a temporary view that is shared among all sessions and keep alive until the > Spark application terminates, you can create a global temporary view. > > > > From: Walid LEZZAR <walez...@gmail.com> > Sent: Friday, April 27, 2018 7:42 AM > To: spark users > Subject: How to read the schema of a partitioned dataframe without listing > all the partitions ? > > Hi, > > I have a parquet on S3 partitioned by day. I have 2 years of data (-> about > 1000 partitions). With spark, when I just want to know the schema of this > parquet without even asking for a single row of data, spark tries to list all > the partitions and the nested partitions of the parquet. Which makes it very > slow just to build the dataframe object on Zeppelin. > > Is there a way to avoid that ? Is there way to tell spark : "hey, just read a > single partition and give me the schema of that partition and consider it as > the schema of the whole dataframe" ? (I don't care about schema merge, it's > off by the way) > > Thanks. > Walid.
How to read the schema of a partitioned dataframe without listing all the partitions ?
Hi, I have a parquet on S3 partitioned by day. I have 2 years of data (-> about 1000 partitions). With spark, when I just want to know the schema of this parquet without even asking for a single row of data, spark tries to list all the partitions and the nested partitions of the parquet. Which makes it very slow just to build the dataframe object on Zeppelin. Is there a way to avoid that ? Is there way to tell spark : "hey, just read a single partition and give me the schema of that partition and consider it as the schema of the whole dataframe" ? (I don't care about schema merge, it's off by the way) Thanks. Walid.
Re: Spark Metrics : Why is the Sink class declared private[spark] ?
This is great ! Hope this jira will be resolved for the next version of spark Thanks. > Le 2 avr. 2016 à 01:07, Saisai Shao <sai.sai.s...@gmail.com> a écrit : > > There's a JIRA (https://issues.apache.org/jira/browse/SPARK-14151) about it, > please take a look. > > Thanks > Saisai > >> On Sat, Apr 2, 2016 at 6:48 AM, Walid Lezzar <walez...@gmail.com> wrote: >> Hi, >> >> I looked into the spark code at how spark report metrics using the >> MetricsSystem class. I've seen that the spark MetricsSystem class when >> instantiated parses the metrics.properties file, tries to find the sinks >> class name and load them dinamically. It would be great to implement my own >> sink by inheriting from the org.apache.spark.metrics.sinks.Sink class but >> unfortunately, this class has been declared private[spark] ! So it is not >> possible to inverit from it ! Why is that ? Is this gonna change in future >> spark versions ? >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >
Spark Metrics : Why is the Sink class declared private[spark] ?
Hi, I looked into the spark code at how spark report metrics using the MetricsSystem class. I've seen that the spark MetricsSystem class when instantiated parses the metrics.properties file, tries to find the sinks class name and load them dinamically. It would be great to implement my own sink by inheriting from the org.apache.spark.metrics.sinks.Sink class but unfortunately, this class has been declared private[spark] ! So it is not possible to inverit from it ! Why is that ? Is this gonna change in future spark versions ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Spark 1.6] Spark Streaming - java.lang.AbstractMethodError
Hi, We have been using spark streaming for a little while now. Until now, we were running our spark streaming jobs in spark 1.5.1 and it was working well. Yesterday, we upgraded to spark 1.6.0 without any changes in the code. But our streaming jobs are not working any more. We are getting an "AbstractMethodError". Please, find the stack trace at the end of the mail. Can we have some hints on what this error means ? (we are using spark to connect to kafka) The stack trace : 16/01/07 10:44:39 INFO ZkState: Starting curator service 16/01/07 10:44:39 INFO CuratorFrameworkImpl: Starting 16/01/07 10:44:39 INFO ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=12 watcher=org.apache.curator.ConnectionState@2e9fa23a 16/01/07 10:44:39 INFO ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 16/01/07 10:44:39 INFO ClientCnxn: Socket connection established to localhost/127.0.0.1:2181, initiating session 16/01/07 10:44:39 INFO ClientCnxn: Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1521b6d262e0035, negotiated timeout = 6 16/01/07 10:44:39 INFO ConnectionStateManager: State change: CONNECTED 16/01/07 10:44:40 INFO PartitionManager: Read partition information from: /spark-kafka-consumer/StreamingArchiver/lbc.job.multiposting.input/partition_0 --> null 16/01/07 10:44:40 INFO JobScheduler: Added jobs for time 145215988 ms 16/01/07 10:44:40 INFO JobScheduler: Starting job streaming job 145215988 ms.0 from job set of time 145215988 ms 16/01/07 10:44:40 ERROR Utils: uncaught error in thread StreamingListenerBus, stopping SparkContext ERROR Utils: uncaught error in thread StreamingListenerBus, stopping SparkContext java.lang.AbstractMethodError at org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:47) at org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:26) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55) at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) 16/01/07 10:44:40 INFO JobScheduler: Finished job streaming job 145215988 ms.0 from job set of time 145215988 ms 16/01/07 10:44:40 INFO JobScheduler: Total delay: 0.074 s for time 145215988 ms (execution: 0.032 s) 16/01/07 10:44:40 ERROR JobScheduler: Error running job streaming job 145215988 ms.0 java.lang.IllegalStateException: SparkContext has been shutdown at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225) at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46) at fr.leboncoin.morpheus.jobs.streaming.StreamingArchiver.lambda$run$ade930b4$1(StreamingArchiver.java:103) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at