Re: Append more files to existing partitioned data
Please checkout. org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand and org.apache.spark.sql.execution.datasources.WriteRelation I guess it's managed by job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString) On 17 March 2018 at 20:46, Serega Sheypak <serega.shey...@gmail.com> wrote: > Hi Denis, great to see you here :) > It works, thanks! > > Do you know how spark generates datafile names? names look like part- > with uuid appended after > > part-0-124a8c43-83b9-44e1-a9c4-dcc8676cdb99.c000.snappy.parquet > > > > > 2018-03-17 14:15 GMT+01:00 Denis Bolshakov <bolshakov.de...@gmail.com>: > >> Hello Serega, >> >> https://spark.apache.org/docs/latest/sql-programming-guide.html >> >> Please try SaveMode.Append option. Does it work for you? >> >> >> сб, 17 мар. 2018 г., 15:19 Serega Sheypak <serega.shey...@gmail.com>: >> >>> Hi, I', using spark-sql to process my data and store result as parquet >>> partitioned by several columns >>> >>> ds.write >>> .partitionBy("year", "month", "day", "hour", "workflowId") >>> .parquet("/here/is/my/dir") >>> >>> >>> I want to run more jobs that will produce new partitions or add more >>> files to existing partitions. >>> What is the right way to do it? >>> >> > -- //with Best Regards --Denis Bolshakov e-mail: bolshakov.de...@gmail.com
Re: Append more files to existing partitioned data
Hello Serega, https://spark.apache.org/docs/latest/sql-programming-guide.html Please try SaveMode.Append option. Does it work for you? сб, 17 мар. 2018 г., 15:19 Serega Sheypak: > Hi, I', using spark-sql to process my data and store result as parquet > partitioned by several columns > > ds.write > .partitionBy("year", "month", "day", "hour", "workflowId") > .parquet("/here/is/my/dir") > > > I want to run more jobs that will produce new partitions or add more files > to existing partitions. > What is the right way to do it? >
Re: Pasting into spark-shell doesn't work for Databricks example
Hello Zhu, Thank you very much for such detailed explanation and providing workaround, it works fine. But since the problem is related to scala issue can we expect the fix in Spark 2.0? Or it's not a good idea to update such important dependency as scala in minor maintenance release? Kind regards, Denis On 22 November 2016 at 22:13, Shixiong(Ryan) Zhu <shixi...@databricks.com> wrote: > The workaround is defining the imports and class together using ":paste". > > On Tue, Nov 22, 2016 at 11:12 AM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> This relates to a known issue: https://issues.apache.o >> rg/jira/browse/SPARK-14146 and https://issues.scala-lang. >> org/browse/SI-9799 >> >> On Tue, Nov 22, 2016 at 6:37 AM, dbolshak <bolshakov.de...@gmail.com> >> wrote: >> >>> Hello, >>> >>> We have the same issue, >>> >>> We use latest release 2.0.2. >>> >>> Setup with 1.6.1 works fine. >>> >>> Could somebody provide a workaround how to fix that? >>> >>> Kind regards, >>> Denis >>> >>> >>> >>> -- >>> View this message in context: http://apache-spark-user-list. >>> 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work- >>> for-Databricks-example-tp28113p28116.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >> > -- //with Best Regards --Denis Bolshakov e-mail: bolshakov.de...@gmail.com
Re: Pasting into spark-shell doesn't work for Databricks example
StructField("product", DoubleType) :: Nil > ^ > :23: error: not found: type DataType > override def dataType: DataType = DoubleType > ^ > :23: error: not found: value DoubleType > override def dataType: DataType = DoubleType >^ > :28: error: not found: type MutableAggregationBuffer > override def initialize(buffer: MutableAggregationBuffer): Unit = > { > ^ > :34: error: not found: type MutableAggregationBuffer > override def update(buffer: MutableAggregationBuffer, input: Row): > Unit = { > ^ > :34: error: not found: type Row > override def update(buffer: MutableAggregationBuffer, input: Row): > Unit = { > ^ > :40: error: not found: type MutableAggregationBuffer > override def merge(buffer1: MutableAggregationBuffer, buffer2: > Row): Unit = { > ^ > :40: error: not found: type Row > override def merge(buffer1: MutableAggregationBuffer, buffer2: > Row): Unit = { > ^ > :46: error: not found: type Row > override def evaluate(buffer: Row): Any = { >^ > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work- > for-Databricks-example-tp28113.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- //with Best Regards --Denis Bolshakov e-mail: bolshakov.de...@gmail.com
Re: spark with kerberos
The problem happens when writting (reading works fine) rdd.saveAsNewAPIHadoopFile We use just RDD and HDFS, no other things. Spark 1.6.1 version. `Claster A` - CDH 5.7.1 `Cluster B` - vanilla hadoop 2.6.5 `Cluster C` - CDH 5.8.0 Best regards, Denis On 13 October 2016 at 13:06, ayan guha <guha.a...@gmail.com> wrote: > And a little more details on Spark version, hadoop version and > distribution would also help... > > On Thu, Oct 13, 2016 at 9:05 PM, ayan guha <guha.a...@gmail.com> wrote: > >> I think one point you need to mention is your target - HDFS, Hive or >> Hbase (or something else) and which end points are used. >> >> On Thu, Oct 13, 2016 at 8:50 PM, dbolshak <bolshakov.de...@gmail.com> >> wrote: >> >>> Hello community, >>> >>> We've a challenge and no ideas how to solve it. >>> >>> The problem, >>> >>> Say we have the following environment: >>> 1. `cluster A`, the cluster does not use kerberos and we use it as a >>> source >>> of data, important thing is - we don't manage this cluster. >>> 2. `cluster B`, small cluster where our spark application is running and >>> performing some logic. (we manage this cluster and it does not have >>> kerberos). >>> 3. `cluster C`, the cluster uses kerberos and we use it to keep results >>> of >>> our spark application, we manage this cluster >>> >>> Our requrements and conditions that are not mentioned yet: >>> 1. All clusters are in a single data center, but in the different >>> subnetworks. >>> 2. We cannot turn on kerberos on `cluster A` >>> 3. We cannot turn off kerberos on `cluster C` >>> 4. We can turn on/off kerberos on `cluster B`, currently it's turned off. >>> 5. Spark app is built on top of RDD and does not depend on spark-sql. >>> >>> Does anybody know how to write data using RDD api to remote cluster >>> which is >>> running with Kerberos? >>> >>> -- >>> //with Best Regards >>> --Denis Bolshakov >>> e-mail: bolshakov.de...@gmail.com >>> >>> >>> >>> -- >>> View this message in context: http://apache-spark-user-list. >>> 1001560.n3.nabble.com/spark-with-kerberos-tp27894.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > > > -- > Best Regards, > Ayan Guha > -- //with Best Regards --Denis Bolshakov e-mail: bolshakov.de...@gmail.com
Spark with kerberos
Hello community, We've a challenge and no ideas how to solve it. The problem, Say we have the following environment: 1. `cluster A`, the cluster does not use kerberos and we use it as a source of data, important thing is - we don't manage this cluster. 2. `cluster B`, small cluster where our spark application is running and performing some logic. (we manage this cluster and it does not have kerberos). 3. `cluster C`, the cluster uses kerberos and we use it to keep results of our spark application, we manage this cluster Our requrements and conditions that are not mentioned yet: 1. All clusters are in a single data center, but in the different subnetworks. 2. We cannot turn on kerberos on `cluster A` 3. We cannot turn off kerberos on `cluster C` 4. We can turn on/off kerberos on `cluster B`, currently it's turned off. 5. Spark app is built on top of RDD and does not depend on spark-sql. Does anybody know how to write data using RDD api to remote cluster which is running with Kerberos? -- //with Best Regards --Denis Bolshakov e-mail: bolshakov.de...@gmail.com
Re: one executor runs multiple parallel tasks VS multiple excutors each runs one task
Look here http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications Probably it will help a bit. Best regards, Denis 11 Окт 2016 г. 23:49 пользователь "Xiaoye Sun"написал: > Hi, > > Currently, I am running Spark using the standalone scheduler with 3 > machines in our cluster. For these three machines, one runs Spark Master > and the other two run Spark Worker. > > We run a machine learning application on this small-scale testbed. A > particular stage in my application is divided into 10 parallel tasks. So I > want to know the pros and cons for different cluster configurations. > > Conf 1: Multiple executors each of which runs one task. > Each worker has 5 executors; each of the executors has 1 CPU core. In such > configuration, the scheduler will give one task to each of the executors. > Each of the tasks probably runs in different JVMs. > > Conf 2: One executor running multiple tasks. > Each worker has only one executor; each executor has 5 CPU cores. In such > case, the scheduler will give 5 tasks to each executor. Tasks running in > the same executor probably run in the same process but different threads. > > I think in many cases, Conf 2 is preferable than Conf 1 since the tasks in > the same executor can share the block manager so data shared among these > tasks doesn't need to be transferred multiple times (e.g. the broadcast > data). However, I am wondering if there is a scenario where Conf 1 is > preferable and does the same conclusion hold when the scheduler is YARN or > Mesos. > > Thanks! > > Best, > Xiaoye > >
Re: Spark Docker Container - Jars problem when deploying my app
Try to build a flat (uber) jar which includes all dependencies. 11 Окт 2016 г. 22:11 пользователь "doruchiulan"написал: > Hi, > > I have a problem that's bothering me for a few days, and I'm pretty out of > ideas. > > I built a Spark docker container where Spark runs in standalone mode. Both > master and worker are started there. > > Now I tried to deploy my Spark Scala App in a separate container(same > machine) where I pass the Spark master URL and other stuff I need to > connect > to Spark. Connection is seamless. > > First problem I encountered was: > > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due > to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: > Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): > java.lang.ClassNotFoundException: > org.apache.spark.streaming.kafka.KafkaRDDPartition > > Then I made a folder of my dependencies except Spark, put them in a folder > alongside my app JAR file and added them to SparkConf using > SparkConf.setJars, > > Now the strange thing happens: > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due > to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: > Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassCastException: > cannot assign instance of scala.collection.immutable. > List$SerializationProxy > to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ > of > type scala.collection.Seq in instance of > org.apache.spark.rdd.MapPartitionsRDD > > More than this, if I just run the scala app from local using java -cp > myApp.jar it works perfectly, jobs > run ok. > > I don't have any SPARK_HOME locally and the setJars basically takes an > empty > list, as If I won't use it. > > I guess it uses jars provided in the classpath when I run my app and I > don't > need to provide anything else. > > If any of you guys have any ideas I would be grateful, I really can't > explain myself why this doesn't work and I didn't do any Spark deployments > until now. I mostly ran in embedded Spark. > > Spark is same version in my app dependencies (2.0.0) as the one running in > the docker container. > I used Scala 2.11.7 for my app > Java 1.8 on both containers(app, spark) > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Spark-Docker-Container-Jars-problem- > when-deploying-my-app-tp27878.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Partition n keys into exacly n partitions
Just provide own partitioner. One I wrote a partitioner which keeps similar keys together in one partitioner. Best regards, Denis On 12 September 2016 at 19:44, sujeet jog <sujeet@gmail.com> wrote: > Hi, > > Is there a way to partition set of data with n keys into exactly n > partitions. > > For ex : - > > tuple of 1008 rows with key as x > tuple of 1008 rows with key as y and so on total 10 keys ( x, y etc ) > > Total records = 10080 > NumOfKeys = 10 > > i want to partition the 10080 elements into exactly 10 partitions with > each partition having elements with unique key > > Is there a way to make this happen ?.. any ideas on implementing custom > partitioner. > > > The current partitioner i'm using is HashPartitioner from which there are > cases where key.hascode() % numPartitions for keys of x & y become same. > > hence many elements with different keys fall into single partition at > times. > > > > Thanks, > Sujeet > -- //with Best Regards --Denis Bolshakov e-mail: bolshakov.de...@gmail.com
Re: Spark tasks blockes randomly on standalone cluster
Hello, I see such behavior from time to time. Similar problem is described here: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-Memory-Task-hangs-td12377.html We also use speculative as a workaround (our spark version is 1.6.0). But I would like to share one of observations. We have two jenkins, one uses java 7 and another java 8. And sometimes I see the issue during integration testing on jenkins with java 7 (and never on java 8) So I really hope that the issue will disappear after we complete our java migration. Which java version do you use? Best regards, Denis On 12 September 2016 at 15:31, bogdanbaraila <bogdanbara...@gmail.com> wrote: > We are having a quite complex application that runs on Spark Standalone. > In some cases the tasks from one of the workers blocks randomly for an > infinite amount of time in the RUNNING state. > <http://apache-spark-user-list.1001560.n3.nabble.com/file/n27693/ > SparkStandaloneIssue.png> > > > Extra info: > - there aren't any errors in the logs > - ran with logger in debug and i didn't saw any relevant messages (i see > when the tasks starts but then there is not activity for it) > - the jobs are working ok if i have just only 1 worker > - the same job may execute the second time without any issues, in a proper > amount of time > - i don't have any really big partitions that could cause delays for some > of the tasks. > - in spark 2.0 i've moved from RDD to Datasets and i have the same issue > - in spark 1.4 i was able to overcome the issue by turning on speculation, > but in spark 2.0 the blocking tasks are from different workers (while in > 1.4 > i have blocking tasks on only 1 worker) so speculation isn't fixing my > issue. > - i have the issue on more environments so i don't think it's hardware > related. > > Did anyone experienced something similar? Any suggestions on how could i > identify the issue? > > Thanks a lot! > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Spark-tasks-blockes-randomly-on- > standalone-cluster-tp27693.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > ----- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- //with Best Regards --Denis Bolshakov e-mail: bolshakov.de...@gmail.com
Re: Creating RDD using swebhdfs with truststore
Hello, I would also set java opts for driver. Best regards, Denis 4 Сен 2016 г. 0:31 пользователь "Sourav Mazumder" < sourav.mazumde...@gmail.com> написал: > Hi, > > I am trying to create a RDD by using swebhdfs to a remote hadoop cluster > which is protected by Knox and uses SSL. > > The code looks like this - > > sc.textFile("swebhdfs:/host:port/gateway/default/webhdfs/ > v1/").count. > > I'm passing the truststore and trustorepassword through extra java options > while starting the spark shell as - > > spark-shell --conf > "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=truststor.jks > -Djavax.net.ssl.trustStorePassword=" --conf "spark.driver. > extraJavaOptions=-Djavax.net.ssl.trustStore=truststore.jks > -Djavax.net.ssl.trustStorePassword=" > > But I'm always getting the error that - > > Name: javax.net.ssl.SSLHandshakeException > Message: Remote host closed connection during handshake > > Am I passing the truststore and truststore password in right way ? > > Regards, > > Sourav > >
Re: After calling persist, why the size in sparkui is not matching with the actual file size
Hello, Spark uses snappy by default, is your original file compressed? Also it keeps data in own representation format (column base), and it's not the same as text. Best regards, Denis On 29 August 2016 at 16:52, Rohit Kumar Prusty <rohit_pru...@infosys.com> wrote: > Hi Team, > > I am new to spark and have this basic question. After calling persist, why > the size in sparkui is not matching with the actual file size? > > > > Actaul File Size for “/user/rohit_prusty/application2.log” – *39 KB* > > > > Code snippet: > > === > > logData = sc.textFile("/user/rohit_prusty/application2.log") > > logData.persist() > > logData.count() > > errors = logData.filter(lambda line: "ERROR" in line) > > errors.persist() > > errors.count() > > > > Output in SparkUI > > == > > logData RDD takes *2.1 KB* > > errors RDD takes *1.3 KB* > > > > Regards > > Rohit Kumar Prusty > > +91-9884070075 > > > -- //with Best Regards --Denis Bolshakov e-mail: bolshakov.de...@gmail.com
Re: How to acess the WrappedArray
Hello, Not sure that it will help, but I would do the following 1. Need to create a case class which matches your json schema. 2. Change the following line: old: Dataset rows_salaries = spark.read().json("/Users/ sreeharsha/Downloads/rows_salaries.json"); new: Dataset rows_salaries = spark.read().json("/Users/ sreeharsha/Downloads/rows_salaries.json").as[MyCaseClass]; 3. Make your code compiling successfully BR, Denis On 29 August 2016 at 12:27, Sree Eedupuganti <s...@inndata.in> wrote: > Here is the snippet of code : > > //The entry point into all functionality in Spark is the SparkSession > class. To create a basic SparkSession, just use SparkSession.builder(): > > SparkSession spark = SparkSession.builder().appName("Java Spark SQL > Example").master("local").getOrCreate(); > > //With a SparkSession, applications can create DataFrames from an existing > RDD, from a Hive table, or from Spark data sources. > > Dataset rows_salaries = spark.read().json("/Users/ > sreeharsha/Downloads/rows_salaries.json"); > > // Register the DataFrame as a SQL temporary view > > rows_salaries.createOrReplaceTempView("salaries"); > > // SQL statements can be run by using the sql methods provided by spark > > List df = spark.sql("select * from salaries").collectAsList(); > > for(Row r:df){ > > if(r.get(0)!=null) > >System.out.println(r.get(0).toString()); > > > } > > > Actaul Output : > > WrappedArray(WrappedArray(1, B9B42DE1-E810-4489-9735-B365A47A4012, 1, > 1467358044, 697390, 1467358044, 697390, null, Aaron,Patricia G, > Facilities/Office Services II, A03031, OED-Employment Dev (031), > 1979-10-24T00:00:00, 56705.00, 54135.44)) > > Expecting Output: > > Need elements from the WrappedArray > > Below you can find the attachment of .json file > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > -- //with Best Regards --Denis Bolshakov e-mail: bolshakov.de...@gmail.com