Run Spark on Java 10
*my user case*: We run Spark cluster on Mesos, since our Mesos cluster is also hosting other frameworks such as Storm, Cassandra, we had incidents where Spark job over-utilizes CPU which caused resource contention with other frameworks. *objective* : run un-modularized spark application (jar is compiled with java 8 compatible sbt compiler) on Java 10 to leverage Java 10 container support. Related link: https://bugs.openjdk.java.net/browse/JDK-8146115 After reading some readings about Java 9, this is my imaginary *happy-path*: 1) Point JAVA_HOME to Java 10, 2) Run my spark job, resolve classNotFoundException, lookup the missing modules in oracle documentation, for example (java.sql https://docs.oracle.com/javase/9/docs/api/java.sql-summary.html) module is missing, add “spark.executor.extraJavaOptions --add-modules java.se.ee” -> conf/spark-defaults.conf 3) Repeat step 2 until no more exceptions are thrown however I found this, Warning: Local jar ***/java.se.ee does not exist, skipping. /java.lang.ClassNotFoundException: com.**.spark.Main at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:466) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:566) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:499) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:374) at org.apache.spark.util.Utils$.classForName(Utils.scala:233) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:732) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)/ This is what I have observed after turning verbose mode ... Using properties file: /tmp/spark-2.2.2-bin-hadoop2.6/conf/spark-defaults.conf Adding default property: spark.eventLog.enabled=true Adding default property: spark.eventLog.dir=hdfs://*** Adding default property: spark.executor.extraJavaOptions=--add-modules java.se.ee Parsed arguments: master mesos://localhost:10017 deployMode cluster executorMemory 16G executorCores 2 totalExecutorCores 50 propertiesFile /tmp/spark-2.2.2-bin-hadoop2.6/conf/spark- defaults.conf driverMemory 4G driverCores 1 driverExtraClassPath null driverExtraLibraryPath null driverExtraJavaOptions null supervise false queue null numExecutors null files null pyFiles null archives null mainClass com.**.spark.Main primaryResource ***.jar name *** childArgs [***] jars null packages null packagesExclusions null repositories null verbosetrue Spark properties used, including those specified through --conf and those from the properties file /tmp/spark-2.2.2-bin-hadoop2.6/conf/spark-defaults.conf: (spark.mesos.uris,hdfs:///***/tmpqIx6x2) (spark.driver.memory,4G) (spark.eventLog.enabled,true) (spark.executor.extraJavaOptions,--add-modules java.se.ee) (spark.executor.uri,***/spark-2.2.2-bin-hadoop2.6.tgz) (spark.eventLog.dir,hdfs://***) *The warning was printed here:* https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala#L101 https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala#L76 After reading the source code. Seems to me that spark-submit does not understand --add-modules option so it treat java.se.ee as a jar file rather than a module. *And I coundn`t make it the way I want it to translate --add-modules when launching executor JVM. Has anyone done similar experiments running Spark on Java 9/10?* Thanks in advance -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Text from pdf spark
Yes, I can access the file using cli. On Fri, Sep 28, 2018 at 1:24 PM kathleen li wrote: > The error message is “file not found” > Are you able to use the following command line to assess the file with the > user you submitted the job? > hdfs dfs -ls /tmp/sample.pdf > > Sent from my iPhone > > On Sep 28, 2018, at 12:10 PM, Joel D wrote: > > I'm trying to extract text from pdf files in hdfs using pdfBox. > > However it throws an error: > > "Exception in thread "main" org.apache.spark.SparkException: ... > > java.io.FileNotFoundException: /nnAlias:8020/tmp/sample.pdf > > (No such file or directory)" > > > > > What am I missing? Should I be working with PortableDataStream instead of > the string part of: > > val files: RDD[(String, PortableDataStream)]? > > def pdfRead(fileNameFromRDD: (String, PortableDataStream), sparkSession: > SparkSession) = { > > val file: File = new File(fileNameFromRDD._1.drop(5)) > > val document = PDDocument.load(file); //It throws an error here. > > > if (!document.isEncrypted()) { > > val stripper = new PDFTextStripper() > > val text = stripper.getText(document) > > println("Text:" + text) > > > } > > document.close() > > > } > > > //This is where I call the above pdf to text converter method. > > val files = > sparkSession.sparkContext.binaryFiles("hdfs://nnAlias:8020/tmp/sample.pdf") > > files.foreach(println) > > > files.foreach(f => println(f._1)) > > > files.foreach(fileStream => pdfRead(fileStream, sparkSession)) > > > Thanks. > > > > > > > >
Re: Text from pdf spark
The error message is “file not found” Are you able to use the following command line to assess the file with the user you submitted the job? hdfs dfs -ls /tmp/sample.pdf Sent from my iPhone > On Sep 28, 2018, at 12:10 PM, Joel D wrote: > > I'm trying to extract text from pdf files in hdfs using pdfBox. > However it throws an error: > > "Exception in thread "main" org.apache.spark.SparkException: ... > java.io.FileNotFoundException: /nnAlias:8020/tmp/sample.pdf > (No such file or directory)" > > > > What am I missing? Should I be working with PortableDataStream instead of the > string part of: > val files: RDD[(String, PortableDataStream)]? > def pdfRead(fileNameFromRDD: (String, PortableDataStream), sparkSession: > SparkSession) = { > val file: File = new File(fileNameFromRDD._1.drop(5)) > val document = PDDocument.load(file); //It throws an error here. > > if (!document.isEncrypted()) { > val stripper = new PDFTextStripper() > val text = stripper.getText(document) > println("Text:" + text) > > } > document.close() > > } > > //This is where I call the above pdf to text converter method. > val files = > sparkSession.sparkContext.binaryFiles("hdfs://nnAlias:8020/tmp/sample.pdf") > files.foreach(println) > > files.foreach(f => println(f._1)) > > files.foreach(fileStream => pdfRead(fileStream, sparkSession)) > > Thanks. > > > > > >
Text from pdf spark
I'm trying to extract text from pdf files in hdfs using pdfBox. However it throws an error: "Exception in thread "main" org.apache.spark.SparkException: ... java.io.FileNotFoundException: /nnAlias:8020/tmp/sample.pdf (No such file or directory)" What am I missing? Should I be working with PortableDataStream instead of the string part of: val files: RDD[(String, PortableDataStream)]? def pdfRead(fileNameFromRDD: (String, PortableDataStream), sparkSession: SparkSession) = { val file: File = new File(fileNameFromRDD._1.drop(5)) val document = PDDocument.load(file); //It throws an error here. if (!document.isEncrypted()) { val stripper = new PDFTextStripper() val text = stripper.getText(document) println("Text:" + text) } document.close() } //This is where I call the above pdf to text converter method. val files = sparkSession.sparkContext.binaryFiles("hdfs://nnAlias:8020/tmp/sample.pdf") files.foreach(println) files.foreach(f => println(f._1)) files.foreach(fileStream => pdfRead(fileStream, sparkSession)) Thanks.
Re: Need to convert Dataset to HashMap
Thanks for the help so far. I tried caching but the operation seems to be taking forever. Any tips on how I can speed up this operation? Also I am not sure case class would work, since different files have different structures (I am parsing a 1GB file right now but there are a few different files that I also need to run this on). -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [Spark SQL] why spark sql hash() are returns the same hash value though the keys/expr are not same
Not sure I get what you mean…. I ran the query that you had – and don’t get the same hash as you. From: Gokula Krishnan D Date: Friday, September 28, 2018 at 10:40 AM To: "Thakrar, Jayesh" Cc: user Subject: Re: [Spark SQL] why spark sql hash() are returns the same hash value though the keys/expr are not same Hello Jayesh, I have masked the input values with . Thanks & Regards, Gokula Krishnan (Gokul) On Wed, Sep 26, 2018 at 2:20 PM Thakrar, Jayesh mailto:jthak...@conversantmedia.com>> wrote: Cannot reproduce your situation. Can you share Spark version? Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_92) Type in expressions to have them evaluated. Type :help for more information. scala> spark.sql("select hash('40514X'),hash('41751')").show() ++---+ |hash(40514X)|hash(41751)| ++---+ | -1898845883| 916273350| ++---+ scala> spark.sql("select hash('14589'),hash('40004')").show() +---+---+ |hash(14589)|hash(40004)| +---+---+ | 777096871|-1593820563| +---+---+ scala> From: Gokula Krishnan D mailto:email2...@gmail.com>> Date: Tuesday, September 25, 2018 at 8:57 PM To: user mailto:user@spark.apache.org>> Subject: [Spark SQL] why spark sql hash() are returns the same hash value though the keys/expr are not same Hello All, I am calculating the hash value of few columns and determining whether its an Insert/Delete/Update Record but found a scenario which is little weird since some of the records returns same hash value though the key's are totally different. For the instance, scala> spark.sql("select hash('40514X'),hash('41751')").show() +---+---+ |hash(40514)|hash(41751)| +---+---+ | 976573657| 976573657| +---+---+ scala> spark.sql("select hash('14589'),hash('40004')").show() +---+---+ |hash(14589)|hash(40004)| +---+---+ | 777096871| 777096871| +---+---+ I do understand that hash() returns an integer, are these reached the max value?. Thanks & Regards, Gokula Krishnan (Gokul)
Re: [Spark SQL] why spark sql hash() are returns the same hash value though the keys/expr are not same
Hello Jayesh, I have masked the input values with . Thanks & Regards, Gokula Krishnan* (Gokul)* On Wed, Sep 26, 2018 at 2:20 PM Thakrar, Jayesh < jthak...@conversantmedia.com> wrote: > Cannot reproduce your situation. > > Can you share Spark version? > > > > Welcome to > > __ > > / __/__ ___ _/ /__ > > _\ \/ _ \/ _ `/ __/ '_/ > >/___/ .__/\_,_/_/ /_/\_\ version 2.2.0 > > /_/ > > > > Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java > 1.8.0_92) > > Type in expressions to have them evaluated. > > Type :help for more information. > > > > scala> spark.sql("select hash('40514X'),hash('41751')").show() > > ++---+ > > |hash(40514X)|hash(41751)| > > ++---+ > > | -1898845883| 916273350| > > ++---+ > > > > > > scala> spark.sql("select hash('14589'),hash('40004')").show() > > +---+---+ > > |hash(14589)|hash(40004)| > > +---+---+ > > | 777096871|-1593820563| > > +---+---+ > > > > > > scala> > > > > *From: *Gokula Krishnan D > *Date: *Tuesday, September 25, 2018 at 8:57 PM > *To: *user > *Subject: *[Spark SQL] why spark sql hash() are returns the same hash > value though the keys/expr are not same > > > > Hello All, > > > > I am calculating the hash value of few columns and determining whether > its an Insert/Delete/Update Record but found a scenario which is little > weird since some of the records returns same hash value though the key's > are totally different. > > > > For the instance, > > > > scala> spark.sql("select hash('40514X'),hash('41751')").show() > > +---+---+ > > |hash(40514)|hash(41751)| > > +---+---+ > > | 976573657| 976573657| > > +---+---+ > > > > scala> spark.sql("select hash('14589'),hash('40004')").show() > > +---+---+ > > |hash(14589)|hash(40004)| > > +---+---+ > > | 777096871| 777096871| > > +---+---+ > > I do understand that hash() returns an integer, are these reached the max > value?. > > > > Thanks & Regards, > > Gokula Krishnan* (Gokul)* >
Spark checkpointing
Hi, is there any way to read up on using spark checkpointing (programmaticly) in an in depth manner? I have an application where I perform multiple operations on a DStream. To my understanding, the result of those Operations would create a new DStream, which can be used for further operations. Which leads to a chain of Operations which can be described as following: 1) Periodicly read data from Kafka (checkpoint this DStream) 2) Create a window on the read data 3) Aggregate on respective windows (checkpoint this DStream) 4) Write to Kafka (checkpoint DStream) In the above chain of operations the Stream fails with following error: "WindowedDStream has been marked for checkpointing but the storage level has not been set to enable persisting". I found out that a WindowedDStream is not supporting persisting data to avoid unnecessary copies of data. Basicly my issue is that I am not able to use a window operation in a checkpointed enviroment: - calling checkpoint on all DStreams fails, cause WindowedDStream does not support persisting - calling checkpoint on all DStreams except WindowedDStream fails, cause somehow it still is marked for checkpointing. Hopefully someone has an idea kind regards - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
How to repartition Spark DStream Kafka ConsumerRecord RDD.
How to repartition Spark DStream Kafka ConsumerRecord RDD. I am getting uneven size of Kafka topics.. We want to repartition the input RDD based on some logic. But when I try to apply the repartition I am getting "object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord" error, I found following workaround https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html Call rdd.forEachPartition and create the NotSerializable object in there like this:rdd.forEachPartition(iter -> { NotSerializable notSerializable = new NotSerializable(); // ...Now process iter}); APPLIED HERE val stream =KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam) ).map(_.value()) stream.foreachRDD { rdd => val repartitionRDD = flow.repartitionRDD(rdd,1) println("&& repartitionRDD " + repartitionRDD.count()) val modifiedRDD = rdd.mapPartitions { iter =>{ val customerRecords: List[ConsumerRecord[String, String]] = List[ConsumerRecord[String, String]]() while(iter.hasNext){ val consumerRecord :ConsumerRecord[String, String] = iter.next() customerRecords:+ consumerRecord } customerRecords.iterator } } val r = modifiedRDD.repartition(1) println("* after repartition " + r.count()) BUT still getting same object not Serializable error. Any help is greatly appreciated.
Re: Need to convert Dataset to HashMap
Hi, sorry indeed you have to cache the dataset, before the groupby (otherwise it will be loaded at each time from disk). For the case class you can have a look at the accepted answer here: https://stackoverflow.com/questions/45017556/how-to-convert-a-simple-dataframe-to-a-dataset-spark-scala-with-case-class Best regards, Alessandro On Fri, 28 Sep 2018 at 09:29, rishmanisation wrote: > Thanks for the response! I'm not sure caching 'freq' would make sense, > since > there are multiple columns in the file and so it will need to be different > for different columns. > > Original data format is .gz (gzip). > > I am a newbie to Spark, so could you please give a little more details on > the appropriate case class? > > Thanks! > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Need to convert Dataset to HashMap
Thanks for the response! I'm not sure caching 'freq' would make sense, since there are multiple columns in the file and so it will need to be different for different columns. Original data format is .gz (gzip). I am a newbie to Spark, so could you please give a little more details on the appropriate case class? Thanks! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Need to convert Dataset to HashMap
Hi, as a first attempt I would try to cache "freq", to be sure that the dataset is not re-loaded at each iteration later on. Btw, what's the original data format you are importing from? I suspect also that an appropriate case class rather than Row would help as well, instead of converting to String and parsing it "manually". Hth, Alessandro On Fri, 28 Sep 2018 at 01:48, rishmanisation wrote: > I am writing a data-profiling application that needs to iterate over a > large > .gz file (imported as a Dataset). Each key-value pair in the hashmap > will be the row value and the number of times it occurs in the column. > There > is one hashmap for each column, and they are all added to a JSON at the > end. > > For now, I am using the following logic to generate the hashmap for a > column: > > Dataset freq = df > .groupBy(columnName) > .count(); > > HashMap myHashMap = new HashMap<>(); > > Iterator rowIterator = freq.toLocalIterator(); > while(rowIterator.hasNext()) { > Row currRow = rowIterator.next(); > String rowString = currRow.toString(); > String[] contents = rowString.substring(1, rowString.length() - > 1).split(","); > Double percent = Long.valueOf(contents[1])*100.0/numOfRows; > myHashMap.put(contents[0], Double.toString(percent)); > } > > I have also tried converting to RDD and using the collectAsMap() function, > but both of these are taking a very long time (about 5 minutes per column, > where each column has approx. 30 million rows). Is there a more efficient > way to achieve the same? > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >