Re: how to merge dataframe write output files

2016-11-10 Thread Jorge Sánchez
Do you have the logs of the containers? This seems like a Memory issue.

2016-11-10 7:28 GMT+00:00 lk_spark :

> hi,all:
> when I call api df.write.parquet ,there is alot of small files :   how
> can I merge then into on file ? I tried df.coalesce(1).write.parquet ,but
> it will get error some times
>
> Container exited with a non-zero exit code 143
>
> more an more...
> -rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 16.0 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 14.0 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc.snappy.parquet
> -rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11
> /parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-
> 23e8-40bb-b30b-09652ca677bc
> more an more...
> 2016-11-10
> --
> lk_spark
>


RE: Strongly Connected Components

2016-11-10 Thread Shreya Agarwal
Yesterday's run died sometime during the night, without any errors. Today, I am 
running it using GraphFrames instead. It is still spawning new tasks, so there 
is progress.

From: Felix Cheung [mailto:felixcheun...@hotmail.com]
Sent: Thursday, November 10, 2016 7:50 PM
To: user@spark.apache.org; Shreya Agarwal 
Subject: Re: Strongly Connected Components

It is possible it is dead. Could you check the Spark UI to see if there is any 
progress?

_
From: Shreya Agarwal >
Sent: Thursday, November 10, 2016 12:45 AM
Subject: RE: Strongly Connected Components
To: >



Bump. Anyone? Its been running for 10 hours now. No results.

From: Shreya Agarwal
Sent: Tuesday, November 8, 2016 9:05 PM
To: user@spark.apache.org
Subject: Strongly Connected Components

Hi,

I am running this on a graph with >5B edges and >3B edges and have 2 questions -


  1.  What is the optimal number of iterations?
  2.  I am running it for 1 iteration right now on a beefy 100 node cluster, 
with 300 executors each having 30GB RAM and 5 cores. I have persisted the graph 
to MEMORY_AND_DISK. And it has been running for 3 hours already. Any ideas on 
how to speed this up?

Regards,
Shreya



Re: Strongly Connected Components

2016-11-10 Thread Felix Cheung
It is possible it is dead. Could you check the Spark UI to see if there is any 
progress?


_
From: Shreya Agarwal >
Sent: Thursday, November 10, 2016 12:45 AM
Subject: RE: Strongly Connected Components
To: >


Bump. Anyone? Its been running for 10 hours now. No results.

From: Shreya Agarwal
Sent: Tuesday, November 8, 2016 9:05 PM
To: user@spark.apache.org
Subject: Strongly Connected Components

Hi,

I am running this on a graph with >5B edges and >3B edges and have 2 questions -


  1.  What is the optimal number of iterations?
  2.  I am running it for 1 iteration right now on a beefy 100 node cluster, 
with 300 executors each having 30GB RAM and 5 cores. I have persisted the graph 
to MEMORY_AND_DISK. And it has been running for 3 hours already. Any ideas on 
how to speed this up?

Regards,
Shreya




Re: Newbie question - Best way to bootstrap with Spark

2016-11-10 Thread jggg777
A couple options:

(1) You can start locally by downloading Spark to your laptop:
http://spark.apache.org/downloads.html , then jump into the Quickstart docs:
http://spark.apache.org/docs/latest/quick-start.html

(2) There is a free Databricks community edition that runs on AWS:
https://databricks.com/try-databricks .  The databricks docs are publicly
available and have tutorial notebooks:
https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html

If you want to run it on a several node cluster for bigger data, it's pretty
easy through the AWS console to spin up an Elastic MapReduce cluster with
Spark pre-installed, but you'll need to sign up for an AWS account.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Best-way-to-bootstrap-with-Spark-tp28032p28061.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Joining to a large, pre-sorted file

2016-11-10 Thread Silvio Fiorito
You want to look at the bucketBy option when you save the master file out. That 
way it will be pre-partitioned by the join column, eliminating the shuffle on 
the larger file.



From: Stuart White 
Date: Thursday, November 10, 2016 at 8:39 PM
To: Jörn Franke 
Cc: "user@spark.apache.org" 
Subject: Re: Joining to a large, pre-sorted file

Yes.  In my original question, when I said I wanted to pre-sort the master 
file, I should have said "pre-sort and pre-partition the file".
Years ago, I did this with Hadoop MapReduce.  I pre-sorted/partitioned the 
master file into N partitions.  Then, when a transaction file would arrive, I 
would sort/partition the transaction file on the join key into N partitions.  
Then I could perform what was called a mapside join.
Basically, I want to do the same thing in Spark.  And it looks like all the 
pieces to accomplish this exist, but I can't figure out how to connect all the 
dots.  It seems like this functionality is pretty new so there aren't a lot of 
examples available.

On Thu, Nov 10, 2016 at 7:33 PM, Jörn Franke 
> wrote:
Can you split the files beforehand in several files (e.g. By the column you do 
the join on?) ?

On 10 Nov 2016, at 23:45, Stuart White 
> wrote:
I have a large "master" file (~700m records) that I frequently join smaller 
"transaction" files to.  (The transaction files have 10's of millions of 
records, so too large for a broadcast join).
I would like to pre-sort the master file, write it to disk, and then, in 
subsequent jobs, read the file off disk and join to it without having to 
re-sort it.  I'm using Spark SQL, and my understanding is that the Spark 
Catalyst Optimizer will choose an optimal join algorithm if it is aware that 
the datasets are sorted.  So, the trick is to make the optimizer aware that the 
master file is already sorted.
I think SPARK-12394 provides 
this functionality, but I can't seem to put the pieces together for how to use 
it.
Could someone possibly provide a simple example of how to:

  1.  Sort a master file by a key column and write it to disk in such a way 
that its "sorted-ness" is preserved.
  2.  In a later job, read a transaction file, sort/partition it as necessary.  
Read the master file, preserving its sorted-ness.  Join the two DataFrames in 
such a way that the master rows are not sorted again.
Thanks!



Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-10 Thread Shixiong(Ryan) Zhu
Yeah, the KafkaRDD cannot be reused. It's better to document it.

On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy  wrote:

> Ok, I have split he KafkaRDD logic to each use their own group and bumped
> the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms
> ends up with a timeout and exception so I am still perplexed on that one.
> The new error I am getting now is a `ConcurrentModificationException`
> when Spark is trying to remove the CachedKafkaConsumer.
>
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> at org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1361)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>
> Here is the basic logic:
>
> *Using KafkaRDD* - This takes a list of channels and processes them in
> parallel using the KafkaRDD directly. They each use a distinct consumer
> group (s"$prefix-$topic"), and each has it's own topic and each topic has
> 4 partitions. We routinely get timeout errors when polling for data when
> the poll.ms is less then 2 seconds. This occurs whether we process in
> parallel.
>
> *Example usage with KafkaRDD:*
> val channels = Seq("channel1", "channel2")
>
> channels.toParArray.foreach { channel =>
>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>
>   // Get offsets for the given topic and the consumer group "$prefix-$
> topic"
>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>
>   val ds = KafkaUtils.createRDD[K, V](context,
> kafkaParams asJava,
> offsetRanges,
> PreferConsistent).toDS[V]
>
>   // Do some aggregations
>   ds.agg(...)
>   // Save the data
>   ds.write.mode(SaveMode.Append).parquet(somePath)
>   // Save offsets using a KafkaConsumer
>   consumer.commitSync(newOffsets.asJava)
>   consumer.close()
> }
>
> I am not sure why the concurrent issue is there as I have tried to debug
> and also looked at the KafkaConsumer code as well, but everything looks
> like it should not occur. The things to figure out is why when running in
> parallel does this occur and also why the timeouts still occur.
>
> Thanks,
>
> Ivan
>
> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger 
> wrote:
>
>> There definitely is Kafka documentation indicating that you should use
>> a different consumer group for logically different subscribers, this
>> is really basic to Kafka:
>>
>> http://kafka.apache.org/documentation#intro_consumers
>>
>> As for your comment that "commit async after each RDD, which is not
>> really viable also", how is it not viable?  Again, committing offsets
>> to Kafka doesn't give you reliable delivery semantics unless your
>> downstream data store is idempotent.  If your downstream data store is
>> idempotent, then it shouldn't matter to you when offset commits
>> happen, as long as they happen within a reasonable time after the data
>> is written.
>>
>> Do you want to keep arguing with me, or follow my advice and proceed
>> with debugging any remaining issues after you make the changes I
>> suggested?
>>
>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy  wrote:
>> > With our stream version, we update the offsets for only the partition we
>> > operating on. We even break down the partition into smaller batches and
>> then
>> > update the offsets after each batch within the partition. With Spark
>> 1.6 and
>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
>> > necessarily a Spark issue since Kafka no longer allows you to simply
>> update
>> > the offsets for a given consumer group. You have to subscribe or assign
>> > partitions to even do so.
>> >
>> > As for storing the offsets in some other place like a DB, it don't find
>> this
>> > useful because you then can't use tools like Kafka Manager. In order to
>> do
>> > so you would have to store in a DB and the circle back and update Kafka
>> > afterwards. This means you have to keep two sources in sync which is not
>> > really a good idea.
>> >
>> > It is a challenge in Spark to use the Kafka offsets since the drive
>> keeps
>> > subscribed to the topic(s) and consumer group, while the executors
>> prepend
>> > "spark-executor-" to the consumer group. The stream (driver) does allow
>> you
>> > to commit async after each RDD, which is not really viable also. I have
>> not
>> > of implementing an Akka actor system on the driver and send it messages
>> from
>> > the executor code to update the offsets, but then that is asynchronous
>> as
>> > well so not really a good solution.
>> >
>> > I have no idea why Kafka made this change and also why in the parallel
>> > KafkaRDD application we would be advised to use different consumer
>> groups
>> > for each RDD. That 

Re: Joining to a large, pre-sorted file

2016-11-10 Thread Stuart White
Yes.  In my original question, when I said I wanted to pre-sort the master
file, I should have said "pre-sort and pre-partition the file".

Years ago, I did this with Hadoop MapReduce.  I pre-sorted/partitioned the
master file into N partitions.  Then, when a transaction file would arrive,
I would sort/partition the transaction file on the join key into N
partitions.  Then I could perform what was called a mapside join.

Basically, I want to do the same thing in Spark.  And it looks like all the
pieces to accomplish this exist, but I can't figure out how to connect all
the dots.  It seems like this functionality is pretty new so there aren't a
lot of examples available.


On Thu, Nov 10, 2016 at 7:33 PM, Jörn Franke  wrote:

> Can you split the files beforehand in several files (e.g. By the column
> you do the join on?) ?
>
> On 10 Nov 2016, at 23:45, Stuart White  wrote:
>
> I have a large "master" file (~700m records) that I frequently join
> smaller "transaction" files to.  (The transaction files have 10's of
> millions of records, so too large for a broadcast join).
>
> I would like to pre-sort the master file, write it to disk, and then, in
> subsequent jobs, read the file off disk and join to it without having to
> re-sort it.  I'm using Spark SQL, and my understanding is that the Spark
> Catalyst Optimizer will choose an optimal join algorithm if it is aware
> that the datasets are sorted.  So, the trick is to make the optimizer aware
> that the master file is already sorted.
>
> I think SPARK-12394 
> provides this functionality, but I can't seem to put the pieces together
> for how to use it.
>
> Could someone possibly provide a simple example of how to:
>
>1. Sort a master file by a key column and write it to disk in such a
>way that its "sorted-ness" is preserved.
>2. In a later job, read a transaction file, sort/partition it as
>necessary.  Read the master file, preserving its sorted-ness.  Join the two
>DataFrames in such a way that the master rows are not sorted again.
>
> Thanks!
>
>


Re: Joining to a large, pre-sorted file

2016-11-10 Thread Jörn Franke
Can you split the files beforehand in several files (e.g. By the column you do 
the join on?) ? 

> On 10 Nov 2016, at 23:45, Stuart White  wrote:
> 
> I have a large "master" file (~700m records) that I frequently join smaller 
> "transaction" files to.  (The transaction files have 10's of millions of 
> records, so too large for a broadcast join).
> 
> I would like to pre-sort the master file, write it to disk, and then, in 
> subsequent jobs, read the file off disk and join to it without having to 
> re-sort it.  I'm using Spark SQL, and my understanding is that the Spark 
> Catalyst Optimizer will choose an optimal join algorithm if it is aware that 
> the datasets are sorted.  So, the trick is to make the optimizer aware that 
> the master file is already sorted.
> 
> I think SPARK-12394 provides this functionality, but I can't seem to put the 
> pieces together for how to use it. 
> 
> Could someone possibly provide a simple example of how to:
> Sort a master file by a key column and write it to disk in such a way that 
> its "sorted-ness" is preserved.
> In a later job, read a transaction file, sort/partition it as necessary.  
> Read the master file, preserving its sorted-ness.  Join the two DataFrames in 
> such a way that the master rows are not sorted again.
> Thanks!
> 


Re: Correct SparkLauncher usage

2016-11-10 Thread Mohammad Tariq
Sure, will look into the tests.

Thanks you so much for your time!


[image: --]

Tariq, Mohammad
[image: https://]about.me/mti





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Fri, Nov 11, 2016 at 4:35 AM, Marcelo Vanzin  wrote:

> Sorry, it's kinda hard to give any more feedback from just the info you
> provided.
>
> I'd start with some working code like this from Spark's own unit tests:
> https://github.com/apache/spark/blob/a8ea4da8d04c1ed621a96668118f20
> 739145edd2/yarn/src/test/scala/org/apache/spark/deploy/
> yarn/YarnClusterSuite.scala#L164
>
>
> On Thu, Nov 10, 2016 at 3:00 PM, Mohammad Tariq 
> wrote:
>
>> All I want to do is submit a job, and keep on getting states as soon as
>> it changes, and come out once the job is over. I'm sorry to be a pest of
>> questions. Kind of having a bit of tough time making this work.
>>
>>
>> [image: --]
>>
>> Tariq, Mohammad
>> [image: https://]about.me/mti
>>
>> 
>>
>>
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> 
>>
>>
>> On Fri, Nov 11, 2016 at 4:27 AM, Mohammad Tariq 
>> wrote:
>>
>>> Yeah, that definitely makes sense. I was just trying to make it work
>>> somehow. The problem is that it's not at all calling the listeners, hence
>>> i'm unable to do anything. Just wanted to cross check it by looping inside.
>>> But I get the point. thank you for that!
>>>
>>> I'm on YARN(cluster mode).
>>>
>>>
>>> [image: --]
>>>
>>> Tariq, Mohammad
>>> [image: https://]about.me/mti
>>>
>>> 
>>>
>>>
>>>
>>>
>>> [image: http://]
>>>
>>> Tariq, Mohammad
>>> about.me/mti
>>> [image: http://]
>>> 
>>>
>>>
>>> On Fri, Nov 11, 2016 at 4:19 AM, Marcelo Vanzin 
>>> wrote:
>>>
 On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq 
 wrote:
 >   @Override
 >   public void stateChanged(SparkAppHandle handle) {
 > System.out.println("Spark App Id [" + handle.getAppId() + "].
 State [" + handle.getState() + "]");
 > while(!handle.getState().isFinal()) {

 You shouldn't loop in an event handler. That's not really how
 listeners work. Instead, use the event handler to update some local
 state, or signal some thread that's waiting for the state change.

 Also be aware that handles currently only work in local and yarn
 modes; the state updates haven't been hooked up to standalone mode
 (maybe for client mode, but definitely not cluster) nor mesos.

 --
 Marcelo

>>>
>>>
>>
>
>
> --
> Marcelo
>


Re: Correct SparkLauncher usage

2016-11-10 Thread Marcelo Vanzin
Sorry, it's kinda hard to give any more feedback from just the info you
provided.

I'd start with some working code like this from Spark's own unit tests:
https://github.com/apache/spark/blob/a8ea4da8d04c1ed621a96668118f20739145edd2/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala#L164


On Thu, Nov 10, 2016 at 3:00 PM, Mohammad Tariq  wrote:

> All I want to do is submit a job, and keep on getting states as soon as it
> changes, and come out once the job is over. I'm sorry to be a pest of
> questions. Kind of having a bit of tough time making this work.
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Fri, Nov 11, 2016 at 4:27 AM, Mohammad Tariq 
> wrote:
>
>> Yeah, that definitely makes sense. I was just trying to make it work
>> somehow. The problem is that it's not at all calling the listeners, hence
>> i'm unable to do anything. Just wanted to cross check it by looping inside.
>> But I get the point. thank you for that!
>>
>> I'm on YARN(cluster mode).
>>
>>
>> [image: --]
>>
>> Tariq, Mohammad
>> [image: https://]about.me/mti
>>
>> 
>>
>>
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> 
>>
>>
>> On Fri, Nov 11, 2016 at 4:19 AM, Marcelo Vanzin 
>> wrote:
>>
>>> On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq 
>>> wrote:
>>> >   @Override
>>> >   public void stateChanged(SparkAppHandle handle) {
>>> > System.out.println("Spark App Id [" + handle.getAppId() + "].
>>> State [" + handle.getState() + "]");
>>> > while(!handle.getState().isFinal()) {
>>>
>>> You shouldn't loop in an event handler. That's not really how
>>> listeners work. Instead, use the event handler to update some local
>>> state, or signal some thread that's waiting for the state change.
>>>
>>> Also be aware that handles currently only work in local and yarn
>>> modes; the state updates haven't been hooked up to standalone mode
>>> (maybe for client mode, but definitely not cluster) nor mesos.
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>


-- 
Marcelo


Re: Correct SparkLauncher usage

2016-11-10 Thread Mohammad Tariq
All I want to do is submit a job, and keep on getting states as soon as it
changes, and come out once the job is over. I'm sorry to be a pest of
questions. Kind of having a bit of tough time making this work.


[image: --]

Tariq, Mohammad
[image: https://]about.me/mti





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Fri, Nov 11, 2016 at 4:27 AM, Mohammad Tariq  wrote:

> Yeah, that definitely makes sense. I was just trying to make it work
> somehow. The problem is that it's not at all calling the listeners, hence
> i'm unable to do anything. Just wanted to cross check it by looping inside.
> But I get the point. thank you for that!
>
> I'm on YARN(cluster mode).
>
>
> [image: --]
>
> Tariq, Mohammad
> [image: https://]about.me/mti
>
> 
>
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
> On Fri, Nov 11, 2016 at 4:19 AM, Marcelo Vanzin 
> wrote:
>
>> On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq 
>> wrote:
>> >   @Override
>> >   public void stateChanged(SparkAppHandle handle) {
>> > System.out.println("Spark App Id [" + handle.getAppId() + "]. State
>> [" + handle.getState() + "]");
>> > while(!handle.getState().isFinal()) {
>>
>> You shouldn't loop in an event handler. That's not really how
>> listeners work. Instead, use the event handler to update some local
>> state, or signal some thread that's waiting for the state change.
>>
>> Also be aware that handles currently only work in local and yarn
>> modes; the state updates haven't been hooked up to standalone mode
>> (maybe for client mode, but definitely not cluster) nor mesos.
>>
>> --
>> Marcelo
>>
>
>


Re: Correct SparkLauncher usage

2016-11-10 Thread Mohammad Tariq
Yeah, that definitely makes sense. I was just trying to make it work
somehow. The problem is that it's not at all calling the listeners, hence
i'm unable to do anything. Just wanted to cross check it by looping inside.
But I get the point. thank you for that!

I'm on YARN(cluster mode).


[image: --]

Tariq, Mohammad
[image: https://]about.me/mti





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Fri, Nov 11, 2016 at 4:19 AM, Marcelo Vanzin  wrote:

> On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq 
> wrote:
> >   @Override
> >   public void stateChanged(SparkAppHandle handle) {
> > System.out.println("Spark App Id [" + handle.getAppId() + "]. State
> [" + handle.getState() + "]");
> > while(!handle.getState().isFinal()) {
>
> You shouldn't loop in an event handler. That's not really how
> listeners work. Instead, use the event handler to update some local
> state, or signal some thread that's waiting for the state change.
>
> Also be aware that handles currently only work in local and yarn
> modes; the state updates haven't been hooked up to standalone mode
> (maybe for client mode, but definitely not cluster) nor mesos.
>
> --
> Marcelo
>


Re: Correct SparkLauncher usage

2016-11-10 Thread Marcelo Vanzin
On Thu, Nov 10, 2016 at 2:43 PM, Mohammad Tariq  wrote:
>   @Override
>   public void stateChanged(SparkAppHandle handle) {
> System.out.println("Spark App Id [" + handle.getAppId() + "]. State [" + 
> handle.getState() + "]");
> while(!handle.getState().isFinal()) {

You shouldn't loop in an event handler. That's not really how
listeners work. Instead, use the event handler to update some local
state, or signal some thread that's waiting for the state change.

Also be aware that handles currently only work in local and yarn
modes; the state updates haven't been hooked up to standalone mode
(maybe for client mode, but definitely not cluster) nor mesos.

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Joining to a large, pre-sorted file

2016-11-10 Thread Stuart White
I have a large "master" file (~700m records) that I frequently join smaller
"transaction" files to.  (The transaction files have 10's of millions of
records, so too large for a broadcast join).

I would like to pre-sort the master file, write it to disk, and then, in
subsequent jobs, read the file off disk and join to it without having to
re-sort it.  I'm using Spark SQL, and my understanding is that the Spark
Catalyst Optimizer will choose an optimal join algorithm if it is aware
that the datasets are sorted.  So, the trick is to make the optimizer aware
that the master file is already sorted.

I think SPARK-12394 
provides this functionality, but I can't seem to put the pieces together
for how to use it.

Could someone possibly provide a simple example of how to:

   1. Sort a master file by a key column and write it to disk in such a way
   that its "sorted-ness" is preserved.
   2. In a later job, read a transaction file, sort/partition it as
   necessary.  Read the master file, preserving its sorted-ness.  Join the two
   DataFrames in such a way that the master rows are not sorted again.

Thanks!


Re: Correct SparkLauncher usage

2016-11-10 Thread Mohammad Tariq
Hi Marcelo,

After a few changes I got it working. However I could not understand one
thing. I need to call Thread.sleep() and then get the state explicitly in
order to make it work.

Also, no matter what I do my launcher program doesn't call stateChanged()
or infoChanged(). Here is my code :

public class RMLauncher implements SparkAppHandle.Listener {

  public static void main(String[] args) {

Map map = new HashMap<>();
map.put("HADOOP_CONF_DIR", "/etc/hadoop/conf");
map.put("KRB5CCNAME", "/tmp/sparkjob");
map.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
launchSparkJob(map);
  }

  public static void launchSparkJob(Map map) {
SparkAppHandle handle = null;
try {
  handle = new SparkLauncher(map).startApplication();
} catch (IOException e) {
  e.printStackTrace();
}
  }

  @Override
  public void stateChanged(SparkAppHandle handle) {
System.out.println("Spark App Id [" + handle.getAppId() + "]. State ["
+ handle.getState() + "]");
while(!handle.getState().isFinal()) {
  System.out.println(" state is not final yet");
  System.out.println(" sleeping for a second");
  try {
Thread.sleep(1000L);
  } catch (InterruptedException e) {
  }
}
  }

  @Override
  public void infoChanged(SparkAppHandle handle) {
System.out.println("Spark App Id [" + handle.getAppId() + "] State
Changed. State [" + handle.getState() + "]");
  }
}

I have set all the required properties and I'm able to submit and run spark
jobs successfully. Any pointers would be really helpful.

Thanks again!


[image: --]

Tariq, Mohammad
[image: https://]about.me/mti





[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]



On Tue, Nov 8, 2016 at 5:16 AM, Marcelo Vanzin  wrote:

> Then you need to look at your logs to figure out why the child app is not
> working. "startApplication" will by default redirect the child's output to
> the parent's logs.
>
> On Mon, Nov 7, 2016 at 3:42 PM, Mohammad Tariq  wrote:
>
>> Hi Marcelo,
>>
>> Thank you for the prompt response. I tried adding listeners as well,
>> didn't work either. Looks like it isn't starting the job at all.
>>
>>
>> [image: --]
>>
>> Tariq, Mohammad
>> [image: https://]about.me/mti
>>
>> 
>>
>>
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> 
>>
>>
>> On Tue, Nov 8, 2016 at 5:06 AM, Marcelo Vanzin 
>> wrote:
>>
>>> On Mon, Nov 7, 2016 at 3:29 PM, Mohammad Tariq 
>>> wrote:
>>> > I have been trying to use SparkLauncher.startApplication() to launch
>>> a Spark app from within java code, but unable to do so. However, same piece
>>> of code is working if I use SparkLauncher.launch().
>>> >
>>> > Here are the corresponding code snippets :
>>> >
>>> > SparkAppHandle handle = new SparkLauncher()
>>> >
>>> > .setSparkHome("/Users/miqbal1/DISTRIBUTED_WORLD/UNPACKED/sp
>>> ark-1.6.1-bin-hadoop2.6")
>>> >
>>> > .setJavaHome("/Library/Java/JavaVirtualMachines/jdk1.8.0_92
>>> .jdk/Contents/Home")
>>> >
>>> > .setAppResource("/Users/miqbal1/wc.jar").setMainClass("org.
>>> myorg.WC").setMaster("local")
>>> >
>>> > .setConf("spark.dynamicAllocation.enabled",
>>> "true").startApplication();System.out.println(handle.getAppId());
>>> >
>>> > System.out.println(handle.getState());
>>> >
>>> > This prints null and UNKNOWN as output.
>>>
>>> The information you're printing is not available immediately after you
>>> call "startApplication()". The Spark app is still starting, so it may
>>> take some time for the app ID and other info to be reported back. The
>>> "startApplication()" method allows you to provide listeners you can
>>> use to know when that information is available.
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>
>
> --
> Marcelo
>


Anyone using ProtoBuf for Kafka messages with Spark Streaming for processing?

2016-11-10 Thread shyla deshpande
Using ProtoBuf for Kafka messages with Spark Streaming because ProtoBuf  is
already being used in the system.

Some sample code and reading material for using ProtoBuf for Kafka messages
with Spark Streaming will be helpful.

Thanks


Re: Access_Remote_Kerberized_Cluster_Through_Spark

2016-11-10 Thread KhajaAsmath Mohammed
Hi Ajay,

I was able to resolve it by adding yarn user principal. here is complete
code.

  def main(args: Array[String]) {
// create Spark context with Spark configuration
val cmdLine = Parse.commandLine(args)
val configFile = cmdLine.getOptionValue("c")
val propertyConfiguration = new PropertyConfiguration()
val props = propertyConfiguration.get(configFile)
   // val
fs=com.yourcompany.telematics.fs.FileSystem.getHdfsFileSystem(props);
val fs=com.yourcompany.telematics.fs.FileSystem getHdfsFileSystem props;
var sparkConfig = propertyConfiguration.initConfiguration()
val sc = new SparkContext(sparkConfig)
val
configuration:org.apache.hadoop.conf.Configuration=sc.hadoopConfiguration

java.lang.System.setProperty("javax.security.auth.useSubjectCredsOnly","true");
java.lang.System.setProperty("java.security.krb5.conf",
"C:\\krb5.conf");
System.setProperty("sun.security.krb5.debug", "true")
configuration.set("hadoop.security.authentication", "Kerberos");
configuration.set("hdfs.namenode.kerberos.principal","hdfs/_
h...@ad.yourcompany.com")
configuration.set("hdfs.datanode.kerberos.principal.pattern","hdfs/*@
AD.yourcompany.COM")
configuration.set("hdfs.master.kerberos.principal","hdfs/*@
AD.yourcompany.COM")
configuration.set("yarn.nodemanager.principal","yarn/*@
AD.yourcompany.COM")
configuration.set("yarn.resourcemanager.principal","yarn/*@
AD.yourcompany.COM")
val hadoopConf="C:\\devtools\\hadoop\\hadoop-2.2.0\\hadoop-2.2.0\\conf"

configuration.addResource(new Path(hadoopConf + "core-site.xml"));
configuration.addResource(new Path(hadoopConf + "hdfs-site.xml"));
configuration.addResource(new Path(hadoopConf + "mapred-site.xml"));
configuration.addResource(new Path(hadoopConf + "yarn-site.xml"));
configuration.addResource(new Path(hadoopConf + "hadoop-policy.xml"));
configuration.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab("va_d...@ad.yourcompany.com",
"C:\\va_dflt.keytab");


// get threshold
   // val threshold = args(1).toInt

// read in text file and split each document into words
val lineRdd =
sc.textFile("hdfs://XX:8020/user/yyy1k78/vehscanxmltext")
val tokenized=lineRdd.flatMap(_.split(" "))
//System.out.println(tokenized.collect().mkString(", "))
// count the occurrence of each word
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

// filter out words with fewer than threshold occurrences
//val filtered = wordCounts.filter(_._2 >= threshold)

// count characters
//val charCounts = filtered.flatMap(_._1.toCharArray).map((_,
1)).reduceByKey(_ + _)

System.out.println(wordCounts.collect().mkString(", "))
  }
}

Thanks,
Asmath.

On Wed, Nov 9, 2016 at 7:44 PM, Ajay Chander  wrote:

> Hi Everyone,
>
> I am still trying to figure this one out. I am stuck with this error 
> "java.io.IOException:
> Can't get Master Kerberos principal for use as renewer ". Below is my code.
> Can any of you please provide any insights on this? Thanks for your time.
>
>
> import java.io.{BufferedInputStream, File, FileInputStream}
> import java.net.URI
>
> import org.apache.hadoop.fs.FileSystem
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.io.IOUtils
> import org.apache.hadoop.security.UserGroupInformation
> import org.apache.spark.deploy.SparkHadoopUtil
> import org.apache.spark.{SparkConf, SparkContext}
>
>
> object SparkHdfs {
>
>   def main(args: Array[String]): Unit = {
>
> System.setProperty("java.security.krb5.conf", new 
> File("src\\main\\files\\krb5.conf").getAbsolutePath )
> System.setProperty("sun.security.krb5.debug", "true")
>
> val sparkConf = new SparkConf().setAppName("SparkHdfs").setMaster("local")
> val sc = new SparkContext(sparkConf)
> //Loading remote cluster configurations
> sc.hadoopConfiguration.addResource(new 
> File("src\\main\\files\\core-site.xml").getAbsolutePath )
> sc.hadoopConfiguration.addResource(new 
> File("src\\main\\files\\hdfs-site.xml").getAbsolutePath )
> sc.hadoopConfiguration.addResource(new 
> File("src\\main\\files\\mapred-site.xml").getAbsolutePath )
> sc.hadoopConfiguration.addResource(new 
> File("src\\main\\files\\yarn-site.xml").getAbsolutePath )
> sc.hadoopConfiguration.addResource(new 
> File("src\\main\\files\\ssl-client.xml").getAbsolutePath )
> sc.hadoopConfiguration.addResource(new 
> File("src\\main\\files\\topology.map").getAbsolutePath )
>
> val conf = new Configuration()
> //Loading remote cluster configurations
> conf.addResource(new Path(new 
> File("src\\main\\files\\core-site.xml").getAbsolutePath ))
> conf.addResource(new Path(new 
> File("src\\main\\files\\hdfs-site.xml").getAbsolutePath ))
> conf.addResource(new Path(new 
> 

Re: UDF with column value comparison fails with PySpark

2016-11-10 Thread Perttu Ranta-aho
So it was something obvious, thanks!

-Perttu

to 10. marraskuuta 2016 klo 21.19 Davies Liu 
kirjoitti:

> On Thu, Nov 10, 2016 at 11:14 AM, Perttu Ranta-aho 
> wrote:
> > Hello,
> >
> > I want to create an UDF which modifies one column value depending on
> value
> > of some other column. But Python version of the code fails always in
> column
> > value comparison. Below are simple examples, scala version works as
> expected
> > but Python version throws an execption. Am I missing something obvious?
> As
> > can be seen from PySpark exception I'm using Spark 2.0.1.
> >
> > -Perttu
> >
> > import org.apache.spark.sql.functions.udf
> > val df = spark.createDataFrame(List(("a",1), ("b",2), ("c",
> > 3))).withColumnRenamed("_1", "name").withColumnRenamed("_2", "value")
> > def myUdf = udf((name: String, value: Int) => {if (name == "c") { value
> * 2
> > } else { value }})
> > df.withColumn("udf", myUdf(df("name"), df("value"))).show()
> > ++-+---+
> > |name|value|udf|
> > ++-+---+
> > |   a|1|  1|
> > |   b|2|  2|
> > |   c|3|  6|
> > ++-+---+
> >
> >
> > from pyspark.sql.types import StringType, IntegerType
> > import pyspark.sql.functions as F
> >
> > df = sqlContext.createDataFrame((('a',1), ('b',2), ('c', 3)),
> > ('name','value'))
> >
> > def my_udf(name, value):
> > if name == 'c':
> > return value * 2
> > return value
> > F.udf(my_udf, IntegerType())
>
> udf = F.udf(my_udf, IntegerType())
> df.withColumn("udf", udf(df.name, df.value)).show()
>
> >
> > df.withColumn("udf", my_udf(df.name, df.value)).show()
> >
> >
> ---
> > ValueErrorTraceback (most recent call
> last)
> >  in ()
> > > 1 df.withColumn("udf", my_udf(df.name, df.value)).show()
> >
> >  in my_udf(name, value)
> >   3
> >   4 def my_udf(name, value):
> > > 5 if name == 'c':
> >   6 return value * 2
> >   7 return value
> >
> > /home/ec2-user/spark-2.0.1-bin-hadoop2.4/python/pyspark/sql/column.pyc in
> > __nonzero__(self)
> > 425
> > 426 def __nonzero__(self):
> > --> 427 raise ValueError("Cannot convert column into bool: please
> > use '&' for 'and', '|' for 'or', "
> > 428  "'~' for 'not' when building DataFrame
> > boolean expressions.")
> > 429 __bool__ = __nonzero__
> >
> > ValueError: Cannot convert column into bool: please use '&' for 'and',
> '|'
> > for 'or', '~' for 'not' when building DataFrame boolean expressions.
>


Re: type-safe join in the new DataSet API?

2016-11-10 Thread Michael Armbrust
You can groupByKey and then cogroup.

On Thu, Nov 10, 2016 at 10:44 AM, Yang  wrote:

> the new DataSet API is supposed to provide type safety and type checks at
> compile time https://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#join-operations
>
> It does this indeed for a lot of places, but I found it still doesn't have
> a type safe join:
>
> val ds1 = hc.sql("select col1, col2 from mytable")
>
> val ds2 = hc.sql("select col3 , col4 from mytable2")
>
> val ds3 = ds1.joinWith(ds2, ds1.col("col1") === ds2.col("col2"))
>
> here spark has no way to make sure (at compile time) that the two columns
> being joined together
> , "col1" and "col2" are of matching types. This is contrast to rdd join,
> where it would be detected at compile time.
>
> am I missing something?
>
> thanks
>
>


Re: UDF with column value comparison fails with PySpark

2016-11-10 Thread Davies Liu
On Thu, Nov 10, 2016 at 11:14 AM, Perttu Ranta-aho  wrote:
> Hello,
>
> I want to create an UDF which modifies one column value depending on value
> of some other column. But Python version of the code fails always in column
> value comparison. Below are simple examples, scala version works as expected
> but Python version throws an execption. Am I missing something obvious? As
> can be seen from PySpark exception I'm using Spark 2.0.1.
>
> -Perttu
>
> import org.apache.spark.sql.functions.udf
> val df = spark.createDataFrame(List(("a",1), ("b",2), ("c",
> 3))).withColumnRenamed("_1", "name").withColumnRenamed("_2", "value")
> def myUdf = udf((name: String, value: Int) => {if (name == "c") { value * 2
> } else { value }})
> df.withColumn("udf", myUdf(df("name"), df("value"))).show()
> ++-+---+
> |name|value|udf|
> ++-+---+
> |   a|1|  1|
> |   b|2|  2|
> |   c|3|  6|
> ++-+---+
>
>
> from pyspark.sql.types import StringType, IntegerType
> import pyspark.sql.functions as F
>
> df = sqlContext.createDataFrame((('a',1), ('b',2), ('c', 3)),
> ('name','value'))
>
> def my_udf(name, value):
> if name == 'c':
> return value * 2
> return value
> F.udf(my_udf, IntegerType())

udf = F.udf(my_udf, IntegerType())
df.withColumn("udf", udf(df.name, df.value)).show()

>
> df.withColumn("udf", my_udf(df.name, df.value)).show()
>
> ---
> ValueErrorTraceback (most recent call last)
>  in ()
> > 1 df.withColumn("udf", my_udf(df.name, df.value)).show()
>
>  in my_udf(name, value)
>   3
>   4 def my_udf(name, value):
> > 5 if name == 'c':
>   6 return value * 2
>   7 return value
>
> /home/ec2-user/spark-2.0.1-bin-hadoop2.4/python/pyspark/sql/column.pyc in
> __nonzero__(self)
> 425
> 426 def __nonzero__(self):
> --> 427 raise ValueError("Cannot convert column into bool: please
> use '&' for 'and', '|' for 'or', "
> 428  "'~' for 'not' when building DataFrame
> boolean expressions.")
> 429 __bool__ = __nonzero__
>
> ValueError: Cannot convert column into bool: please use '&' for 'and', '|'
> for 'or', '~' for 'not' when building DataFrame boolean expressions.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



UDF with column value comparison fails with PySpark

2016-11-10 Thread Perttu Ranta-aho
Hello,

I want to create an UDF which modifies one column value depending on value
of some other column. But Python version of the code fails always in column
value comparison. Below are simple examples, scala version works as
expected but Python version throws an execption. Am I missing something
obvious? As can be seen from PySpark exception I'm using Spark 2.0.1.

-Perttu

import org.apache.spark.sql.functions.udf
val df = spark.createDataFrame(List(("a",1), ("b",2), ("c",
3))).withColumnRenamed("_1", "name").withColumnRenamed("_2", "value")
def myUdf = udf((name: String, value: Int) => {if (name == "c") { value * 2
} else { value }})
df.withColumn("udf", myUdf(df("name"), df("value"))).show()
++-+---+
|name|value|udf|
++-+---+
|   a|1|  1|
|   b|2|  2|
|   c|3|  6|
++-+---+


from pyspark.sql.types import StringType, IntegerType
import pyspark.sql.functions as F

df = sqlContext.createDataFrame((('a',1), ('b',2), ('c', 3)),
('name','value'))

def my_udf(name, value):
if name == 'c':
return value * 2
return value
F.udf(my_udf, IntegerType())

df.withColumn("udf", my_udf(df.name, df.value)).show()

---
ValueErrorTraceback (most recent call last)
 in ()
> 1 df.withColumn("udf", my_udf(df.name, df.value)).show()

 in my_udf(name, value)
  3
  4 def my_udf(name, value):
> 5 if name == 'c':
  6 return value * 2
  7 return value

/home/ec2-user/spark-2.0.1-bin-hadoop2.4/python/pyspark/sql/column.pyc in
__nonzero__(self)
425
426 def __nonzero__(self):
--> 427 raise ValueError("Cannot convert column into bool: please
use '&' for 'and', '|' for 'or', "
428  "'~' for 'not' when building DataFrame
boolean expressions.")
429 __bool__ = __nonzero__

ValueError: Cannot convert column into bool: please use '&' for 'and', '|'
for 'or', '~' for 'not' when building DataFrame boolean expressions.


type-safe join in the new DataSet API?

2016-11-10 Thread Yang
the new DataSet API is supposed to provide type safety and type checks at
compile time
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations

It does this indeed for a lot of places, but I found it still doesn't have
a type safe join:

val ds1 = hc.sql("select col1, col2 from mytable")

val ds2 = hc.sql("select col3 , col4 from mytable2")

val ds3 = ds1.joinWith(ds2, ds1.col("col1") === ds2.col("col2"))

here spark has no way to make sure (at compile time) that the two columns
being joined together
, "col1" and "col2" are of matching types. This is contrast to rdd join,
where it would be detected at compile time.

am I missing something?

thanks


Spark Streaming: question on sticky session across batches ?

2016-11-10 Thread Manish Malhotra
Hello Spark Devs/Users,

Im trying to solve the use case with Spark Streaming 1.6.2 where for every
batch ( say 2 mins) data needs to go to the same reducer node after
grouping by key.
The underlying storage is Cassandra and not HDFS.

This is a map-reduce job, where also trying to use the partitions of the
Cassandra table to batch the data for the same partition.

The requirement of sticky session/partition across batches is because the
operations which we need to do, needs to read data for every key and then
merge this with the current batch aggregate values. So, currently when
there is no stickyness across batches, we have to read for every key, merge
and then write back. and reads are very expensive. So, if we have sticky
session, we can avoid read in every batch and have a cache of till last
batch aggregates across batches.

So, there are few options, can think of:

1. to change the TaskSchedulerImpl, as its using Random to identify the
node for mapper/reducer before starting the batch/phase.
Not sure if there is a custom scheduler way of achieving it?

2. Can custom RDD can help to find the node for the key-->node.
there is a getPreferredLocation() method.
But not sure, whether this will be persistent or can vary for some edge
cases?

Thanks in advance for you help and time !

Regards,
Manish


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-10 Thread Ivan von Nagy
Ok, I have split he KafkaRDD logic to each use their own group and bumped
the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms ends
up with a timeout and exception so I am still perplexed on that one. The
new error I am getting now is a `ConcurrentModificationException` when
Spark is trying to remove the CachedKafkaConsumer.

java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1361)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)

Here is the basic logic:

*Using KafkaRDD* - This takes a list of channels and processes them in
parallel using the KafkaRDD directly. They each use a distinct consumer
group (s"$prefix-$topic"), and each has it's own topic and each topic has 4
partitions. We routinely get timeout errors when polling for data when the
poll.ms is less then 2 seconds. This occurs whether we process in parallel.

*Example usage with KafkaRDD:*
val channels = Seq("channel1", "channel2")

channels.toParArray.foreach { channel =>
  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)

  // Get offsets for the given topic and the consumer group "$prefix-$topic"

  val offsetRanges = getOffsets(s"$prefix-$topic", channel)

  val ds = KafkaUtils.createRDD[K, V](context,
kafkaParams asJava,
offsetRanges,
PreferConsistent).toDS[V]

  // Do some aggregations
  ds.agg(...)
  // Save the data
  ds.write.mode(SaveMode.Append).parquet(somePath)
  // Save offsets using a KafkaConsumer
  consumer.commitSync(newOffsets.asJava)
  consumer.close()
}

I am not sure why the concurrent issue is there as I have tried to debug
and also looked at the KafkaConsumer code as well, but everything looks
like it should not occur. The things to figure out is why when running in
parallel does this occur and also why the timeouts still occur.

Thanks,

Ivan

On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger  wrote:

> There definitely is Kafka documentation indicating that you should use
> a different consumer group for logically different subscribers, this
> is really basic to Kafka:
>
> http://kafka.apache.org/documentation#intro_consumers
>
> As for your comment that "commit async after each RDD, which is not
> really viable also", how is it not viable?  Again, committing offsets
> to Kafka doesn't give you reliable delivery semantics unless your
> downstream data store is idempotent.  If your downstream data store is
> idempotent, then it shouldn't matter to you when offset commits
> happen, as long as they happen within a reasonable time after the data
> is written.
>
> Do you want to keep arguing with me, or follow my advice and proceed
> with debugging any remaining issues after you make the changes I
> suggested?
>
> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy  wrote:
> > With our stream version, we update the offsets for only the partition we
> > operating on. We even break down the partition into smaller batches and
> then
> > update the offsets after each batch within the partition. With Spark 1.6
> and
> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
> > necessarily a Spark issue since Kafka no longer allows you to simply
> update
> > the offsets for a given consumer group. You have to subscribe or assign
> > partitions to even do so.
> >
> > As for storing the offsets in some other place like a DB, it don't find
> this
> > useful because you then can't use tools like Kafka Manager. In order to
> do
> > so you would have to store in a DB and the circle back and update Kafka
> > afterwards. This means you have to keep two sources in sync which is not
> > really a good idea.
> >
> > It is a challenge in Spark to use the Kafka offsets since the drive keeps
> > subscribed to the topic(s) and consumer group, while the executors
> prepend
> > "spark-executor-" to the consumer group. The stream (driver) does allow
> you
> > to commit async after each RDD, which is not really viable also. I have
> not
> > of implementing an Akka actor system on the driver and send it messages
> from
> > the executor code to update the offsets, but then that is asynchronous as
> > well so not really a good solution.
> >
> > I have no idea why Kafka made this change and also why in the parallel
> > KafkaRDD application we would be advised to use different consumer groups
> > for each RDD. That seems strange to me that different consumer groups
> would
> > be required or advised. There is no Kafka documentation that I know if
> that
> > states this. The biggest issue I see with the parallel KafkaRDD is the
> > timeouts. I have tried to set poll.ms to 30 seconds and still get the
> issue.
> > Something 

will spark aggregate and treeaggregate case a shuflle action?

2016-11-10 Thread codlife
Hi Users:
  I'm doubt about whether spark aggregate and treeaggregate case a shuffle
action? if not how spark do the combine option in spark internal.Any tips is
appreciated, thank you!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/will-spark-aggregate-and-treeaggregate-case-a-shuflle-action-tp28060.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: If we run sc.textfile(path,xxx) many times, will the elements be the same in each partition

2016-11-10 Thread Prashant Sharma
+user -dev

Since the same hash based partitioner is in action by default. In my
understanding every time same partitioning will happen.

Thanks,

On Nov 10, 2016 7:13 PM, "WangJianfei" 
wrote:

> Hi Devs:
> If  i run sc.textFile(path,xxx) many times, will the elements be the
> same(same element,same order)in each partitions?
> My experiment show that it's the same, but which may not cover all the
> cases. Thank you!
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/If-we-run-sc-
> textfile-path-xxx-many-times-will-the-elements-be-the-same-
> in-each-partition-tp19814.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-10 Thread Cody Koeninger
The basic structured streaming source for Kafka is already committed to
master, build it and try it out.

If you're already using Kafka I don't really see much point in trying to
put Akka in between it and Spark.

On Nov 10, 2016 02:25, "vincent gromakowski" 
wrote:

I have already integrated common actors. I am also interested, specially to
see how we can achieve end to end back pressure.

2016-11-10 8:46 GMT+01:00 shyla deshpande :

> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
> Spark Streaming and Cassandra using Structured Streaming. But the kafka
> source support for Structured Streaming is not yet available. So now I am
> trying to use Akka Stream as the source to Spark Streaming.
>
> Want to make sure I am heading in the right direction. Please direct me to
> any sample code and reading material for this.
>
> Thanks
>
>


Re: importing data into hdfs/spark using Informatica ETL tool

2016-11-10 Thread Mich Talebzadeh
Sounds like the only option informatica has for Hadoop is connector to Hive
and as I read it it connects to Hive thrift server.

The tool is called Informatica cloud connector and it is add-on which means
that it is not part of standard informatica offering.

anyway if we can use informatica to get data from source as  is and create
file system format (csv, etc)   as target, then those files can land on a
directory. A cron job then can put those files in HDFS directories and the
rest our choice how to treat those files. Hive external tables can be used
plus using Hive or Spark to ingest data into target tables in Hive
periodically. I will still go for ORC tables. Data. will be append only.

That is my conclusion.but still open to suggestions.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 November 2016 at 08:11, Mich Talebzadeh 
wrote:

>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> Forwarded conversation
> Subject: importing data into hdfs/spark using Informatica ETL tool
> 
>
> From: Mich Talebzadeh 
> Date: 9 November 2016 at 13:56
> To: "user @spark" 
>
>
> Hi,
>
> I am exploring the idea of flexibility with importing multiple RDBMS
> tables using Informatica that customer has into HDFS.
>
> I don't want to use connectivity tools from Informatica to Hive etc.
>
> So this is what I have in mind
>
>
>1. If possible get the tables data out using Informatica and use
>Informatica ui  to convert RDBMS data into some form of CSV, TSV file (Can
>Informatica do it?) I guess yes
>2. Put the flat files on an edge where HDFS node can see them.
>3. Assuming that a directory can be created by Informatica daily,
>periodically run a cron that ingest that data from directories into HDFS
>equivalent daily directories
>4. Once the data is in HDFS one can use, Spark csv, Hive etc to query
>data
>
> The problem I have is to see if someone has done such thing before.
> Specifically can Informatica create target flat files on normal directories.
>
> Any other generic alternative?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> --
> From: Michael Segel 
> Date: 9 November 2016 at 16:14
> To: Mich Talebzadeh 
> Cc: "user @spark" 
>
>
> Mich,
>
> You could do that. But really?
>
> Putting on my solutions architect hat…
>
> You or your client is spending $$$ for product licensing and you’re not
> really using the product to its fullest.
>
> Yes, you can use Informatica to pull data from the source systems and
> provide some data cleansing and transformations before you drop it on your
> landing zone.
>
> If you’re going to bypass Hive, then you have to capture the schema,
> including data types.  You’re also going to have to manage schema evolution
> as they change over time. (I believe the ETL tools will do this for you or
> help in the process.)
>
> But if you’re already working on the consumption process for ingestion on
> your own… what is the value that you derive from using Informatica?  Is the
> unloading and ingestion process that difficult that you can’t write that as
> well?
>
> My point is that if you’re going to use the tool, use it as the vendor
> 

RE: Re:RE: how to merge dataframe write output files

2016-11-10 Thread Mendelson, Assaf
As people stated, when you coalesce to 1 partition then basically you lose all 
parallelism, however, you can coalesce to a difference value.
If for example you coalesce to 20 then you can parallelize up to 20 different 
tasks.
You have a total of 4 executors, with 2 cores each. This means that you 
basically have a core parallelism of 8. In general it is best to have a number 
of tasks which is 2-3 times that number for better distribution. So in general 
~20 tasks would be a good idea. Looking at your output I see part 00176 which I 
guess would mean you have an order of 200 tasks (which is the default 
parallelism when you have a shuffle for example).
Coalescing to 20 would still give you enough parallelism to use your cluster 
and would give you less files which are bigger.
Assaf.


From: Shreya Agarwal [mailto:shrey...@microsoft.com]
Sent: Thursday, November 10, 2016 10:28 AM
To: lk_spark
Cc: user.spark
Subject: RE: Re:RE: how to merge dataframe write output files

Your coalesce should technically work - One thing to check would be overhead 
memory. You should configure it as 10% of executor memory.  Also, you might 
need to increase maxResultSize. Also, the data looks fine for the cluster 
unless your join yields >6G worth of data. Few things to try -

  1.  Can you do a cache on both the DFs and try?
  2.  Can you do a cache on both, then use scala join on DFs instead of loading 
to sql ?
  3.  Can you try dataset instead of dataframe? (assuming you are on Spark 2.0)
  4.  If you still see errors, can you check YARN logs, or post here?

I am sorry I don't know the answer to this,  but pretty sure there should be a 
way to work with fragmented files too.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Thursday, November 10, 2016 12:20 AM
To: Shreya Agarwal >
Cc: user.spark >
Subject: Re:RE: how to merge dataframe write output files

thank you for reply,Shreya:
It's because the files is too small and hdfs dosen't like small file .
for your question. yes I want to create ExternalTable on the parquetfile 
floder. And how to use fragmented files as you mention?

the tests case as below:
bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g 
--executor-memory 8g --executor-cores 2 --num-executors 4
val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/")
df.createOrReplaceTempView("biztag") #almost 70M   3673411 rows
val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608")
df2.createOrReplaceTempView("biz1608")#almost 90M   381700  rows
for(i <- 1 to 61) {
  val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left 
join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}")
  dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}")
 }



At 2016-11-10 15:47:02, "Shreya Agarwal" 
> wrote:
Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

Not to mention that it'll make your write incredibly slow and also it'll take 
away all the speed of reading in the data from a parquet as there won't be any 
parallelism at the time of input (if you try to input this parquet).

Again, the important question is - Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it'll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark >
Subject: how to merge dataframe write output files

hi,all:
when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times

Container exited with a non-zero exit code 143
more an more...
-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 

RE: Strongly Connected Components

2016-11-10 Thread Shreya Agarwal
Bump. Anyone? Its been running for 10 hours now. No results.

From: Shreya Agarwal
Sent: Tuesday, November 8, 2016 9:05 PM
To: user@spark.apache.org
Subject: Strongly Connected Components

Hi,

I am running this on a graph with >5B edges and >3B edges and have 2 questions -


  1.  What is the optimal number of iterations?
  2.  I am running it for 1 iteration right now on a beefy 100 node cluster, 
with 300 executors each having 30GB RAM and 5 cores. I have persisted the graph 
to MEMORY_AND_DISK. And it has been running for 3 hours already. Any ideas on 
how to speed this up?

Regards,
Shreya


Re: SparkLauncer 2.0.1 version working incosistently in yarn-client mode

2016-11-10 Thread Elkhan Dadashov
Thanks Marcelo.

I changed the code using CountDownLatch, and it works as expected.

...final CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);
SparkAppHandle appHandle =
sparkLauncher.startApplication(sparkAppListener);Thread
sparkAppListenerThread = new Thread(sparkAppListener);
sparkAppListenerThread.start();long timeout = 120;
countDownLatch.await(timeout, TimeUnit.SECONDS);
...

private static class SparkAppListener implements
SparkAppHandle.Listener, Runnable {
private static final Log log =
LogFactory.getLog(SparkAppListener.class);
private final CountDownLatch countDownLatch;
public SparkAppListener(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void stateChanged(SparkAppHandle handle) {
String sparkAppId = handle.getAppId();
State appState = handle.getState();
if (sparkAppId != null) {
log.info("Spark job with app id: " + sparkAppId + ",\t
State changed to: " + appState + " - "
+ SPARK_STATE_MSG.get(appState));
} else {
log.info("Spark job's state changed to: " + appState +
" - " + SPARK_STATE_MSG.get(appState));
}
if (appState != null && appState.isFinal()) {
countDownLatch.countDown();
}
}
@Override
public void infoChanged(SparkAppHandle handle) {}
@Override
public void run() {}
}


On Mon, Nov 7, 2016 at 9:46 AM Marcelo Vanzin  wrote:

> On Sat, Nov 5, 2016 at 2:54 AM, Elkhan Dadashov 
> wrote:
> > while (appHandle.getState() == null || !appHandle.getState().isFinal()) {
> > if (appHandle.getState() != null) {
> > log.info("while: Spark job state is : " + appHandle.getState());
> > if (appHandle.getAppId() != null) {
> > log.info("\t App id: " + appHandle.getAppId() + "\tState: "
> +
> > appHandle.getState());
> > }
> > }
> > }
>
> This is a ridiculously expensive busy loop, even more so if you
> comment out the log lines. Use listeners, or at least sleep a little
> bit every once in a while. You're probably starving other processes /
> threads of cpu.
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


RE: Re:RE: how to merge dataframe write output files

2016-11-10 Thread Shreya Agarwal
Your coalesce should technically work - One thing to check would be overhead 
memory. You should configure it as 10% of executor memory.  Also, you might 
need to increase maxResultSize. Also, the data looks fine for the cluster 
unless your join yields >6G worth of data. Few things to try -

  1.  Can you do a cache on both the DFs and try?
  2.  Can you do a cache on both, then use scala join on DFs instead of loading 
to sql ?
  3.  Can you try dataset instead of dataframe? (assuming you are on Spark 2.0)
  4.  If you still see errors, can you check YARN logs, or post here?

I am sorry I don't know the answer to this,  but pretty sure there should be a 
way to work with fragmented files too.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Thursday, November 10, 2016 12:20 AM
To: Shreya Agarwal 
Cc: user.spark 
Subject: Re:RE: how to merge dataframe write output files

thank you for reply,Shreya:
It's because the files is too small and hdfs dosen't like small file .
for your question. yes I want to create ExternalTable on the parquetfile 
floder. And how to use fragmented files as you mention?

the tests case as below:
bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g 
--executor-memory 8g --executor-cores 2 --num-executors 4
val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/")
df.createOrReplaceTempView("biztag") #almost 70M   3673411 rows
val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608")
df2.createOrReplaceTempView("biz1608")#almost 90M   381700  rows
for(i <- 1 to 61) {
  val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left 
join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}")
  dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}")
 }



At 2016-11-10 15:47:02, "Shreya Agarwal" 
> wrote:

Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

Not to mention that it'll make your write incredibly slow and also it'll take 
away all the speed of reading in the data from a parquet as there won't be any 
parallelism at the time of input (if you try to input this parquet).

Again, the important question is - Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it'll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark >
Subject: how to merge dataframe write output files

hi,all:
when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times

Container exited with a non-zero exit code 143
more an more...
-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet

Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-10 Thread vincent gromakowski
I have already integrated common actors. I am also interested, specially to
see how we can achieve end to end back pressure.

2016-11-10 8:46 GMT+01:00 shyla deshpande :

> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
> Spark Streaming and Cassandra using Structured Streaming. But the kafka
> source support for Structured Streaming is not yet available. So now I am
> trying to use Akka Stream as the source to Spark Streaming.
>
> Want to make sure I am heading in the right direction. Please direct me to
> any sample code and reading material for this.
>
> Thanks
>
>


Re: Unable to lauch Python Web Application on Spark Cluster

2016-11-10 Thread Daniel van der Ende
Hi Anjali,

It would help to see the code. But more importantly: why do you want to
deploy a web application on a Spark cluster? Spark is meant for
distributed, in-memory computations. I don't know what you're application
is doing, but it would make more sense to run it outside the Spark cluster,
and then submit computation tasks if/when necessary.

Daniel

On Thu, Nov 10, 2016 at 9:03 AM, anjali gautam 
wrote:

>
> -- Forwarded message --
> From: anjali gautam 
> Date: Thu, Nov 10, 2016 at 12:01 PM
> Subject: Unable to lauch Python Web Application on Spark Cluster
> To: user@spark.apache.org
>
>
> Hello Everyone,
>
> I have developed a web application (say abc) in Python using web.py. I
> want to deploy it on the Spark Cluster. Since this application is a project
> with dependencies therefore I have made a zip file of the project (abc.zip)
> to be deployed on the cluster. In the project abc I have to execute
> application.py file which has the main method to start my web application.
> The command to execute the project is :
>
> bin/spark-submit --master spark://master:7077 --py-files abc.zip
> application.py
>
> The structure of the project abc is that it has a data folder which has a
> file data.txt. The other python files in the project abc access that
> data.txt.
>
> On executing the above command and then opening the browser to access the
> developed web application results in an error of "data/data.txt not found"
> when actually it is present.
>
> I have been trying this for long. but everytime it shows the same error.
> Can anybody help me in this case. Plz let me know if any other info is
> required.
>
> Thanks & Regards,
> Anjali
>
>
>
>
>


-- 
Daniel


Re:RE: how to merge dataframe write output files

2016-11-10 Thread lk_spark
thank you for reply,Shreya:
It's because the files is too small and hdfs dosen't like small file . 
for your question. yes I want to create ExternalTable on the parquetfile 
floder. And how to use fragmented files as you mention?


the tests case as below:
bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g 
--executor-memory 8g --executor-cores 2 --num-executors 4
val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/")
df.createOrReplaceTempView("biztag") #almost 70M   3673411 rows
val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608")
df2.createOrReplaceTempView("biz1608")#almost 90M   381700  rows
for(i <- 1 to 61) {
  val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left 
join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}")
  dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}")
 }





At 2016-11-10 15:47:02, "Shreya Agarwal"  wrote:


Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

 

Not to mention that it’ll make your write incredibly slow and also it’ll take 
away all the speed of reading in the data from a parquet as there won’t be any 
parallelism at the time of input (if you try to input this parquet).

 

Again, the important question is – Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it’ll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

 

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark 
Subject: how to merge dataframe write output files

 

hi,all:

when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times 

Container exited with a non-zero exit code 143

more an more... 

-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc

more an more... 

2016-11-10

lk_spark

Fwd: Unable to lauch Python Web Application on Spark Cluster

2016-11-10 Thread anjali gautam
-- Forwarded message --
From: anjali gautam 
Date: Thu, Nov 10, 2016 at 12:01 PM
Subject: Unable to lauch Python Web Application on Spark Cluster
To: user@spark.apache.org


Hello Everyone,

I have developed a web application (say abc) in Python using web.py. I want
to deploy it on the Spark Cluster. Since this application is a project with
dependencies therefore I have made a zip file of the project (abc.zip) to
be deployed on the cluster. In the project abc I have to execute
application.py file which has the main method to start my web application.
The command to execute the project is :

bin/spark-submit --master spark://master:7077 --py-files abc.zip
application.py

The structure of the project abc is that it has a data folder which has a
file data.txt. The other python files in the project abc access that
data.txt.

On executing the above command and then opening the browser to access the
developed web application results in an error of "data/data.txt not found"
when actually it is present.

I have been trying this for long. but everytime it shows the same error.
Can anybody help me in this case. Plz let me know if any other info is
required.

Thanks & Regards,
Anjali