Re: Append more files to existing partitioned data

2018-03-18 Thread Denis Bolshakov
Please checkout.

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand


and

org.apache.spark.sql.execution.datasources.WriteRelation


I guess it's managed by

job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString)


On 17 March 2018 at 20:46, Serega Sheypak <serega.shey...@gmail.com> wrote:

> Hi Denis, great to see you here :)
> It works, thanks!
>
> Do you know how spark generates datafile names?  names look like part-
> with uuid appended after
>
> part-0-124a8c43-83b9-44e1-a9c4-dcc8676cdb99.c000.snappy.parquet
>
>
>
>
> 2018-03-17 14:15 GMT+01:00 Denis Bolshakov <bolshakov.de...@gmail.com>:
>
>> Hello Serega,
>>
>> https://spark.apache.org/docs/latest/sql-programming-guide.html
>>
>> Please try SaveMode.Append option. Does it work for you?
>>
>>
>> сб, 17 мар. 2018 г., 15:19 Serega Sheypak <serega.shey...@gmail.com>:
>>
>>> Hi, I', using spark-sql to process my data and store result as parquet
>>> partitioned by several columns
>>>
>>> ds.write
>>>   .partitionBy("year", "month", "day", "hour", "workflowId")
>>>   .parquet("/here/is/my/dir")
>>>
>>>
>>> I want to run more jobs that will produce new partitions or add more
>>> files to existing partitions.
>>> What is the right way to do it?
>>>
>>
>


-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Re: Append more files to existing partitioned data

2018-03-17 Thread Denis Bolshakov
Hello Serega,

https://spark.apache.org/docs/latest/sql-programming-guide.html

Please try SaveMode.Append option. Does it work for you?


сб, 17 мар. 2018 г., 15:19 Serega Sheypak :

> Hi, I', using spark-sql to process my data and store result as parquet
> partitioned by several columns
>
> ds.write
>   .partitionBy("year", "month", "day", "hour", "workflowId")
>   .parquet("/here/is/my/dir")
>
>
> I want to run more jobs that will produce new partitions or add more files
> to existing partitions.
> What is the right way to do it?
>


Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-22 Thread Denis Bolshakov
Hello Zhu,

Thank you very much for such detailed explanation and providing workaround,
it works fine.

But since the problem is related to scala issue can we expect the fix in
Spark 2.0? Or it's not a good idea to update such important dependency as
scala in minor maintenance release?

Kind regards,
Denis

On 22 November 2016 at 22:13, Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> The workaround is defining the imports and class together using ":paste".
>
> On Tue, Nov 22, 2016 at 11:12 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> This relates to a known issue: https://issues.apache.o
>> rg/jira/browse/SPARK-14146 and https://issues.scala-lang.
>> org/browse/SI-9799
>>
>> On Tue, Nov 22, 2016 at 6:37 AM, dbolshak <bolshakov.de...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> We have the same issue,
>>>
>>> We use latest release 2.0.2.
>>>
>>> Setup with 1.6.1 works fine.
>>>
>>> Could somebody provide a workaround how to fix that?
>>>
>>> Kind regards,
>>> Denis
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-
>>> for-Databricks-example-tp28113p28116.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Re: Pasting into spark-shell doesn't work for Databricks example

2016-11-22 Thread Denis Bolshakov
   StructField("product", DoubleType) :: Nil
>   ^
> :23: error: not found: type DataType
>  override def dataType: DataType = DoubleType
> ^
> :23: error: not found: value DoubleType
>  override def dataType: DataType = DoubleType
>^
> :28: error: not found: type MutableAggregationBuffer
>  override def initialize(buffer: MutableAggregationBuffer): Unit =
> {
>  ^
> :34: error: not found: type MutableAggregationBuffer
>  override def update(buffer: MutableAggregationBuffer, input: Row):
> Unit = {
>  ^
> :34: error: not found: type Row
>  override def update(buffer: MutableAggregationBuffer, input: Row):
> Unit = {
>   ^
> :40: error: not found: type MutableAggregationBuffer
>  override def merge(buffer1: MutableAggregationBuffer, buffer2:
> Row): Unit = {
>  ^
> :40: error: not found: type Row
>  override def merge(buffer1: MutableAggregationBuffer, buffer2:
> Row): Unit = {
> ^
> :46: error: not found: type Row
>  override def evaluate(buffer: Row): Any = {
>^
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Pasting-into-spark-shell-doesn-t-work-
> for-Databricks-example-tp28113.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Re: spark with kerberos

2016-10-13 Thread Denis Bolshakov
The problem happens when writting (reading works fine)

rdd.saveAsNewAPIHadoopFile

We use just RDD and HDFS, no other things.
Spark 1.6.1 version.
`Claster A` - CDH 5.7.1
`Cluster B` - vanilla hadoop 2.6.5
`Cluster C` - CDH 5.8.0

Best regards,
Denis

On 13 October 2016 at 13:06, ayan guha <guha.a...@gmail.com> wrote:

> And a little more details on Spark version, hadoop version and
> distribution would also help...
>
> On Thu, Oct 13, 2016 at 9:05 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> I think one point you need to mention is your target - HDFS, Hive or
>> Hbase (or something else) and which end points are used.
>>
>> On Thu, Oct 13, 2016 at 8:50 PM, dbolshak <bolshakov.de...@gmail.com>
>> wrote:
>>
>>> Hello community,
>>>
>>> We've a challenge and no ideas how to solve it.
>>>
>>> The problem,
>>>
>>> Say we have the following environment:
>>> 1. `cluster A`, the cluster does not use kerberos and we use it as a
>>> source
>>> of data, important thing is - we don't manage this cluster.
>>> 2. `cluster B`, small cluster where our spark application is running and
>>> performing some logic. (we manage this cluster and it does not have
>>> kerberos).
>>> 3. `cluster C`, the cluster uses kerberos and we use it to keep results
>>> of
>>> our spark application, we manage this cluster
>>>
>>> Our requrements and conditions that are not mentioned yet:
>>> 1. All clusters are in a single data center, but in the different
>>> subnetworks.
>>> 2. We cannot turn on kerberos on `cluster A`
>>> 3. We cannot turn off kerberos on `cluster C`
>>> 4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
>>> 5. Spark app is built on top of RDD and does not depend on spark-sql.
>>>
>>> Does anybody know how to write data using RDD api to remote cluster
>>> which is
>>> running with Kerberos?
>>>
>>> --
>>> //with Best Regards
>>> --Denis Bolshakov
>>> e-mail: bolshakov.de...@gmail.com
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/spark-with-kerberos-tp27894.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Spark with kerberos

2016-10-13 Thread Denis Bolshakov
Hello community,

We've a challenge and no ideas how to solve it.

The problem,

Say we have the following environment:
1. `cluster A`, the cluster does not use kerberos and we use it as a source
of data, important thing is - we don't manage this cluster.
2. `cluster B`, small cluster where our spark application is running and
performing some logic. (we manage this cluster and it does not have
kerberos).
3. `cluster C`, the cluster uses kerberos and we use it to keep results of
our spark application, we manage this cluster

Our requrements and conditions that are not mentioned yet:
1. All clusters are in a single data center, but in the different
subnetworks.
2. We cannot turn on kerberos on `cluster A`
3. We cannot turn off kerberos on `cluster C`
4. We can turn on/off kerberos on `cluster B`, currently it's turned off.
5. Spark app is built on top of RDD and does not depend on spark-sql.

Does anybody know how to write data using RDD api to remote cluster which
is running with Kerberos?

-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Re: one executor runs multiple parallel tasks VS multiple excutors each runs one task

2016-10-11 Thread Denis Bolshakov
Look here
http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications

Probably it will help a bit.

Best regards,
Denis

11 Окт 2016 г. 23:49 пользователь "Xiaoye Sun" 
написал:

> Hi,
>
> Currently, I am running Spark using the standalone scheduler with 3
> machines in our cluster. For these three machines, one runs Spark Master
> and the other two run Spark Worker.
>
> We run a machine learning application on this small-scale testbed. A
> particular stage in my application is divided into 10 parallel tasks. So I
> want to know the pros and cons for different cluster configurations.
>
> Conf 1: Multiple executors each of which runs one task.
> Each worker has 5 executors; each of the executors has 1 CPU core. In such
> configuration, the scheduler will give one task to each of the executors.
> Each of the tasks probably runs in different JVMs.
>
> Conf 2: One executor running multiple tasks.
> Each worker has only one executor; each executor has 5 CPU cores. In such
> case, the scheduler will give 5 tasks to each executor. Tasks running in
> the same executor probably run in the same process but different threads.
>
> I think in many cases, Conf 2 is preferable than Conf 1 since the tasks in
> the same executor can share the block manager so data shared among these
> tasks doesn't need to be transferred multiple times (e.g. the broadcast
> data). However, I am wondering if there is a scenario where Conf 1 is
> preferable and does the same conclusion hold when the scheduler is YARN or
> Mesos.
>
> Thanks!
>
> Best,
> Xiaoye
>
>


Re: Spark Docker Container - Jars problem when deploying my app

2016-10-11 Thread Denis Bolshakov
Try to build a flat (uber) jar which includes all dependencies.

11 Окт 2016 г. 22:11 пользователь "doruchiulan" 
написал:

> Hi,
>
> I have a problem that's bothering me for a few days, and I'm pretty out of
> ideas.
>
> I built a Spark docker container where Spark runs in standalone mode. Both
> master and worker are started there.
>
> Now I tried to deploy my Spark Scala App in a separate container(same
> machine) where I pass the Spark master URL and other stuff I need to
> connect
> to Spark. Connection is seamless.
>
> First problem I encountered was:
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due
> to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure:
> Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4):
> java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka.KafkaRDDPartition
>
> Then I made a folder of my dependencies except Spark, put them in a folder
> alongside my app JAR file and added them to SparkConf using
> SparkConf.setJars,
>
> Now the strange thing happens:
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due
> to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure:
> Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassCastException:
> cannot assign instance of scala.collection.immutable.
> List$SerializationProxy
> to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_
> of
> type scala.collection.Seq in instance of
> org.apache.spark.rdd.MapPartitionsRDD
>
> More than this, if I just run the scala app from local using java -cp
>  myApp.jar it works perfectly, jobs
> run ok.
>
> I don't have any SPARK_HOME locally and the setJars basically takes an
> empty
> list, as If I won't use it.
>
> I guess it uses jars provided in the classpath when I run my app and I
> don't
> need to provide anything else.
>
> If any of you guys have any ideas I would be grateful, I really can't
> explain myself why this doesn't work and I didn't do any Spark deployments
> until now. I mostly ran in embedded Spark.
>
> Spark is same version in my app dependencies (2.0.0) as the one running in
> the docker container.
> I used Scala 2.11.7 for my app
> Java 1.8 on both containers(app, spark)
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-Docker-Container-Jars-problem-
> when-deploying-my-app-tp27878.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Partition n keys into exacly n partitions

2016-09-12 Thread Denis Bolshakov
Just provide own partitioner.

One I wrote a partitioner which keeps similar keys together in one
 partitioner.

Best regards,
Denis

On 12 September 2016 at 19:44, sujeet jog <sujeet@gmail.com> wrote:

> Hi,
>
> Is there a way to partition set of data with n keys into exactly n
> partitions.
>
> For ex : -
>
> tuple of 1008 rows with key as x
> tuple of 1008 rows with key as y   and so on  total 10 keys ( x, y etc )
>
> Total records = 10080
> NumOfKeys = 10
>
> i want to partition the 10080 elements into exactly 10 partitions with
> each partition having elements with unique key
>
> Is there a way to make this happen ?.. any ideas on implementing custom
> partitioner.
>
>
> The current partitioner i'm using is HashPartitioner from which there are
> cases where key.hascode() % numPartitions  for keys of x & y become same.
>
>  hence many elements with different keys fall into single partition at
> times.
>
>
>
> Thanks,
> Sujeet
>



-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Re: Spark tasks blockes randomly on standalone cluster

2016-09-12 Thread Denis Bolshakov
Hello,

I see such behavior from time to time.

Similar problem is described here:
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-Memory-Task-hangs-td12377.html

We also use speculative as a workaround (our spark version is 1.6.0).

But I would like to share one of observations.
We have two jenkins, one uses java 7 and another java 8.

And sometimes I see the issue during integration testing on jenkins with
java 7 (and never on java 8)

So I really hope that the issue will disappear after we complete our java
migration.

Which java version do you use?

Best regards,
Denis

On 12 September 2016 at 15:31, bogdanbaraila <bogdanbara...@gmail.com>
wrote:

> We are having a quite complex application that runs on Spark Standalone.
> In some cases the tasks from one of the workers blocks randomly for an
> infinite amount of time in the RUNNING state.
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n27693/
> SparkStandaloneIssue.png>
>
>
> Extra info:
> - there aren't any errors in the logs
> - ran with logger in debug and i didn't saw any relevant messages (i see
> when the tasks starts but then there is not activity for it)
> - the jobs are working ok if i have just only 1 worker
> - the same job may execute the second time without any issues, in a proper
> amount of time
> - i don't have any really big partitions that could  cause delays for some
> of the tasks.
> - in spark 2.0 i've moved from RDD to Datasets and i have the same issue
> - in spark 1.4 i was able to overcome the issue by turning on speculation,
> but in spark 2.0 the blocking tasks are from different workers (while in
> 1.4
> i have blocking tasks on only 1 worker) so speculation isn't fixing my
> issue.
> - i have the issue on more environments so i don't think it's hardware
> related.
>
> Did anyone experienced something similar? Any suggestions on how could i
> identify the issue?
>
> Thanks a lot!
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-tasks-blockes-randomly-on-
> standalone-cluster-tp27693.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Re: Creating RDD using swebhdfs with truststore

2016-09-04 Thread Denis Bolshakov
Hello,

I would also set java opts for driver.

Best regards,
Denis

4 Сен 2016 г. 0:31 пользователь "Sourav Mazumder" <
sourav.mazumde...@gmail.com> написал:

> Hi,
>
> I am trying to create a RDD by using swebhdfs to a remote hadoop cluster
> which is protected by Knox and uses SSL.
>
> The code looks like this -
>
> sc.textFile("swebhdfs:/host:port/gateway/default/webhdfs/
> v1/").count.
>
> I'm passing the truststore and trustorepassword through extra java options
> while starting the spark shell as -
>
> spark-shell --conf 
> "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=truststor.jks
> -Djavax.net.ssl.trustStorePassword=" --conf "spark.driver.
> extraJavaOptions=-Djavax.net.ssl.trustStore=truststore.jks
> -Djavax.net.ssl.trustStorePassword="
>
> But I'm always getting the error that -
>
> Name: javax.net.ssl.SSLHandshakeException
> Message: Remote host closed connection during handshake
>
> Am I passing the truststore and truststore password in right way ?
>
> Regards,
>
> Sourav
>
>


Re: After calling persist, why the size in sparkui is not matching with the actual file size

2016-08-29 Thread Denis Bolshakov
Hello,

Spark uses snappy by default, is your original file compressed?
Also it keeps data in own representation format (column base), and it's not
the same as text.

Best regards,
Denis

On 29 August 2016 at 16:52, Rohit Kumar Prusty <rohit_pru...@infosys.com>
wrote:

> Hi Team,
>
> I am new to spark and have this basic question. After calling persist, why
> the size in sparkui is not matching with the actual file size?
>
>
>
> Actaul File Size for “/user/rohit_prusty/application2.log” – *39 KB*
>
>
>
> Code snippet:
>
> ===
>
> logData = sc.textFile("/user/rohit_prusty/application2.log")
>
> logData.persist()
>
> logData.count()
>
> errors = logData.filter(lambda line: "ERROR" in line)
>
> errors.persist()
>
> errors.count()
>
>
>
> Output in SparkUI
>
> ==
>
> logData RDD takes *2.1 KB*
>
> errors RDD takes *1.3 KB*
>
>
>
> Regards
>
> Rohit Kumar Prusty
>
> +91-9884070075
>
>
>



-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com


Re: How to acess the WrappedArray

2016-08-29 Thread Denis Bolshakov
Hello,

Not sure that it will help, but I would do the following

1. Need to create a case class which matches your json schema.
2. Change the following line:
old:
Dataset rows_salaries = spark.read().json("/Users/
sreeharsha/Downloads/rows_salaries.json");
new:
Dataset rows_salaries = spark.read().json("/Users/
sreeharsha/Downloads/rows_salaries.json").as[MyCaseClass];
3. Make your code compiling successfully

BR,
Denis

On 29 August 2016 at 12:27, Sree Eedupuganti <s...@inndata.in> wrote:

> Here is the snippet of code :
>
> //The entry point into all functionality in Spark is the SparkSession
> class. To create a basic SparkSession, just use SparkSession.builder():
>
> SparkSession spark = SparkSession.builder().appName("Java Spark SQL
> Example").master("local").getOrCreate();
>
> //With a SparkSession, applications can create DataFrames from an existing
> RDD, from a Hive table, or from Spark data sources.
>
> Dataset rows_salaries = spark.read().json("/Users/
> sreeharsha/Downloads/rows_salaries.json");
>
> // Register the DataFrame as a SQL temporary view
>
> rows_salaries.createOrReplaceTempView("salaries");
>
> // SQL statements can be run by using the sql methods provided by spark
>
> List df = spark.sql("select * from salaries").collectAsList();
>
> for(Row r:df){
>
> if(r.get(0)!=null)
>
>System.out.println(r.get(0).toString());
>
>
> }
>
>
> Actaul Output :
>
> WrappedArray(WrappedArray(1, B9B42DE1-E810-4489-9735-B365A47A4012, 1,
> 1467358044, 697390, 1467358044, 697390, null, Aaron,Patricia G,
> Facilities/Office Services II, A03031, OED-Employment Dev (031),
> 1979-10-24T00:00:00, 56705.00, 54135.44))
>
> Expecting Output:
>
> Need elements from the WrappedArray
>
> Below you can find the attachment of .json file
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>



-- 
//with Best Regards
--Denis Bolshakov
e-mail: bolshakov.de...@gmail.com