Re: Data Duplication Bug Found - Structured Streaming Versions 3..4.1, 3.2.4, and 3.3.2

2023-09-18 Thread Jerry Peng
Hi Craig,

Thank you for sending us more information.  Can you answer my previous
question which I don't think the document addresses. How did you determine
duplicates in the output?  How was the output data read? The FileStreamSink
provides exactly-once writes ONLY if you read the output with the
FileStreamSource or the FileSource (batch).  A log is used to determine
what data is committed or not and those aforementioned sources know how to
use that log to read the data "exactly-once".  So there may be duplicated
data written on disk.  If you simply just read the data files written to
disk you may see duplicates when there are failures.  However, if you read
the output location with Spark you should get exactly once results (unless
there is a bug) since spark will know how to use the commit log to see what
data files are committed and not.

Best,

Jerry

On Mon, Sep 18, 2023 at 1:18 PM Craig Alfieri 
wrote:

> Hi Russell/Jerry/Mich,
>
>
>
> Appreciate your patience on this.
>
>
>
> Attached are more details on how this duplication “error” was found.
>
> Since we’re still unsure I am using “error” in quotes.
>
>
>
> We’d love the opportunity to work with any of you directly and/or the
> wider Spark community to triage this or get a better understanding of the
> nature of what we’re experiencing.
>
>
>
> Our platform provides the ability to fully reproduce this.
>
>
>
> Once you have had the chance to review the attached draft, let us know if
> there are any questions in the meantime. Again, we welcome the opportunity
> to work with the teams on this.
>
>
>
> Best-
>
> Craig
>
>
>
>
>
>
>
> *From: *Craig Alfieri 
> *Date: *Thursday, September 14, 2023 at 8:45 PM
> *To: *russell.spit...@gmail.com 
> *Cc: *Jerry Peng , Mich Talebzadeh <
> mich.talebza...@gmail.com>, user@spark.apache.org ,
> connor.mc...@antithesis.com 
> *Subject: *Re: Data Duplication Bug Found - Structured Streaming Versions
> 3..4.1, 3.2.4, and 3.3.2
>
> Hi Russell et al,
>
>
>
> Acknowledging receipt; we’ll get these answers back to the group.
>
>
>
> Follow-up forthcoming.
>
>
>
> Craig
>
>
>
>
>
>
>
> On Sep 14, 2023, at 6:38 PM, russell.spit...@gmail.com wrote:
>
> Exactly once should be output sink dependent, what sink was being used?
>
> Sent from my iPhone
>
>
>
> On Sep 14, 2023, at 4:52 PM, Jerry Peng 
> wrote:
>
> 
>
> Craig,
>
>
>
> Thanks! Please let us know the result!
>
>
>
> Best,
>
>
>
> Jerry
>
>
>
> On Thu, Sep 14, 2023 at 12:22 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
>
> Hi Craig,
>
>
>
> Can you please clarify what this bug is and provide sample code causing
> this issue?
>
>
>
> HTH
>
>
> Mich Talebzadeh,
>
> Distinguished Technologist, Solutions Architect & Engineer
>
> London
>
> United Kingdom
>
>
>
>  [image: Image removed by sender.]  view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 14 Sept 2023 at 17:48, Craig Alfieri 
> wrote:
>
> Hello Spark Community-
>
>
>
> As part of a research effort, our team here at Antithesis tests for
> correctness/fault tolerance of major OSS projects.
>
> Our team recently was testing Spark’s Structured Streaming, and we came
> across a data duplication bug we’d like to work with the teams on to
> resolve.
>
>
>
> Our intention is to utilize this as a future case study for our platform,
> but prior to doing so we like to have a resolution in place so that an
> announcement isn’t alarming to the user base.
>
>
>
> Attached is a high level .pdf that reviews the High Availability set-up
> put under test.
>
> This was also tested across the three latest versions, and the same
> behavior was observed.
>
>
>
> We can reproduce this error readily, since our environment is fully
> deterministic, we are just not Spark experts and would like to work with
> someone in the community to resolve this.
>
>
>
> Please let us know at your earliest convenience.
>
>
>
> Best
>
>
>
> Error! Filename not specified.
>

Re: Data Duplication Bug Found - Structured Streaming Versions 3..4.1, 3.2.4, and 3.3.2

2023-09-14 Thread Jerry Peng
Craig,

Thanks! Please let us know the result!

Best,

Jerry

On Thu, Sep 14, 2023 at 12:22 PM Mich Talebzadeh 
wrote:

>
> Hi Craig,
>
> Can you please clarify what this bug is and provide sample code causing
> this issue?
>
> HTH
>
> Mich Talebzadeh,
> Distinguished Technologist, Solutions Architect & Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 14 Sept 2023 at 17:48, Craig Alfieri 
> wrote:
>
>> Hello Spark Community-
>>
>>
>>
>> As part of a research effort, our team here at Antithesis tests for
>> correctness/fault tolerance of major OSS projects.
>>
>> Our team recently was testing Spark’s Structured Streaming, and we came
>> across a data duplication bug we’d like to work with the teams on to
>> resolve.
>>
>>
>>
>> Our intention is to utilize this as a future case study for our platform,
>> but prior to doing so we like to have a resolution in place so that an
>> announcement isn’t alarming to the user base.
>>
>>
>>
>> Attached is a high level .pdf that reviews the High Availability set-up
>> put under test.
>>
>> This was also tested across the three latest versions, and the same
>> behavior was observed.
>>
>>
>>
>> We can reproduce this error readily, since our environment is fully
>> deterministic, we are just not Spark experts and would like to work with
>> someone in the community to resolve this.
>>
>>
>>
>> Please let us know at your earliest convenience.
>>
>>
>>
>> Best
>>
>>
>>
>> *[image: signature_2327449931]*
>>
>> *Craig Alfieri*
>>
>> c: 917.841.1652
>>
>> craig.alfi...@antithesis.com
>>
>> New York, NY.
>>
>> Antithesis.com
>> 
>>
>>
>>
>> We can't talk about most of the bugs that we've found for our customers,
>>
>> but some customers like to speak about their work with us:
>>
>> https://github.com/mongodb/mongo/wiki/Testing-MongoDB-with-Antithesis
>>
>>
>>
>>
>>
>>
>> *-*
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity for whom they are
>> addressed. If you received this message in error, please notify the sender
>> and remove it from your system.*
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Unsubscribe

2023-05-01 Thread peng





Re: Potability of dockers built on different cloud platforms

2023-04-05 Thread Ken Peng




ashok34...@yahoo.com.INVALID wrote:
Is it possible to use Spark docker built on GCP on AWS without 
rebuilding from new on AWS?


I am using the spark image from bitnami for running on k8s.
And yes, it's deployed by helm.


--
https://kenpeng.pages.dev/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2023-01-20 Thread peng


unsubscribe



Re: A simple comparison for three SQL engines

2022-04-09 Thread Wes Peng

may I forward this report to spark list as well.

Thanks.

Wes Peng wrote:

Hello,

This weekend I made a test against a big dataset. spark, drill, mysql, 
postgresql were involved.


This is the final report:
https://blog.cloudcache.net/handles-the-file-larger-than-memory/

The simple conclusion:
1. spark is the fastest for this scale of data and limited memory
2. drill is close to spark
3. postgresql has surprising behavior in query speed
4. mysql is really slow

If you have found any issue please let me know.

Thanks

Wes Peng wrote:

sure.I will take time to do it.


Sanel Zukan wrote:

Any chance you can try with Postgres >= 12, default configuration with
the same indexed columns as with MySQL?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Executorlost failure

2022-04-07 Thread Wes Peng
I just did a test, even for a single node (local deployment), spark can 
handle the data whose size is much larger than the total memory.


My test VM (2g ram, 2 cores):

$ free -m
  totalusedfree  shared  buff/cache 
available
Mem:   19921845  92  19  54 
 36

Swap:  1023 285 738


The data size:

$ du -h rate.csv
3.2Grate.csv


Loading this file into spark for calculation can be done without error:

scala> val df = spark.read.format("csv").option("inferSchema", 
true).load("skydrive/rate.csv")
val df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 
more fields]


scala> df.printSchema
warning: 1 deprecation (since 2.13.3); for details, enable `:setting 
-deprecation` or `:replay -deprecation`

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: integer (nullable = true)


scala> 
df.groupBy("_c1").agg(avg("_c2").alias("avg_rating")).orderBy(desc("avg_rating")).show
warning: 1 deprecation (since 2.13.3); for details, enable `:setting 
-deprecation` or `:replay -deprecation`
+--+--+ 


|   _c1|avg_rating|
+--+--+
|000136|   5.0|
|0001711474|   5.0|
|0001360779|   5.0|
|0001006657|   5.0|
|0001361155|   5.0|
|0001018043|   5.0|
|000136118X|   5.0|
|202010|   5.0|
|0001371037|   5.0|
|401048|   5.0|
|0001371045|   5.0|
|0001203010|   5.0|
|0001381245|   5.0|
|0001048236|   5.0|
|0001436163|   5.0|
|000104897X|   5.0|
|0001437879|   5.0|
|0001056107|   5.0|
|0001468685|   5.0|
|0001061240|   5.0|
+--+--+
only showing top 20 rows


So as you see spark can handle file larger than its memory well. :)

Thanks


rajat kumar wrote:

With autoscaling can have any numbers of executors.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Executorlost failure

2022-04-07 Thread Wes Peng
I once had a file which is 100+GB getting computed in 3 nodes, each node 
has 24GB memory only. And the job could be done well. So from my 
experience spark cluster seems to work correctly for big files larger 
than memory by swapping them to disk.


Thanks

rajat kumar wrote:
Tested this with executors of size 5 cores, 17GB memory. Data vol is 
really high around 1TB


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Executorlost failure

2022-04-07 Thread Wes Peng

how many executors do you have?

rajat kumar wrote:
Tested this with executors of size 5 cores, 17GB memory. Data vol is 
really high around 1TB


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



query time comparison to several SQL engines

2022-04-07 Thread Wes Peng
I made a simple test to query time for several SQL engines including 
mysql, hive, drill and spark. The report,


https://cloudcache.net/data/query-time-mysql-hive-drill-spark.pdf

It maybe have no special meaning, just for fun. :)

regards.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Profiling spark application

2022-01-19 Thread Wes Peng

Give a look at this:
https://github.com/LucaCanali/sparkMeasure

On 2022/1/20 1:18, Prasad Bhalerao wrote:
Is there any way we can profile spark applications which will show no. 
of invocations of spark api and their execution time etc etc just the 
way jprofiler shows all the details?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Pyspark] How to download Zip file from SFTP location and put in into Azure Data Lake and unzip it

2022-01-18 Thread Wes Peng
How large is the file? From my experience, reading the excel file from 
data lake and loading as dataframe, works great.


Thanks

On 2022-01-18 22:16, Heta Desai wrote:

Hello,

 I have zip files on SFTP location. I want to download/copy those
files and put into Azure Data Lake. Once the zip files get stored into
Azure Data Lake, I want to unzip those files and read using Data
Frames.

 The file format inside zip is excel. SO, once files are unzipped, I
want to read excel files using spark DataFrames.

 Please help me with the solution as soon as possible.

 Thanks,

 ​Heta Desai | Data | Sr Associate L1
e.heta.de...@1rivet.com | t. +91 966.225.4954

 ​

 This email, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If you are not the intended recipient,
please advise the sender immediately and delete this message and any
attachments. Unless otherwise specifically stated in this email,
transaction related information in this email, including attachments,
is not to be construed as an offer, solicitation or the basis or
confirmation for any contract for the purchase/sale of any services.
All email sent to or from this address will be received by 1Rivet US,
Inc and is subject to archival retention and review by someone other
than the recipient.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ivy unit test case filing for Spark

2021-12-21 Thread Wes Peng
Are you using IvyVPN which causes this problem? If the VPN software changes
the network URL silently you should avoid using them.

Regards.

On Wed, Dec 22, 2021 at 1:48 AM Pralabh Kumar 
wrote:

> Hi Spark Team
>
> I am building a spark in VPN . But the unit test case below is failing.
> This is pointing to ivy location which  cannot be reached within VPN . Any
> help would be appreciated
>
> test("SPARK-33084: Add jar support Ivy URI -- default transitive = true")
> {
>   *sc *= new SparkContext(new 
> SparkConf().setAppName("test").setMaster("local-cluster[3,
> 1, 1024]"))
>   *sc*.addJar("*ivy://org.apache.hive:hive-storage-api:2.7.0*")
>   assert(*sc*.listJars().exists(_.contains(
> "org.apache.hive_hive-storage-api-2.7.0.jar")))
>   assert(*sc*.listJars().exists(_.contains(
> "commons-lang_commons-lang-2.6.jar")))
> }
>
> Error
>
> - SPARK-33084: Add jar support Ivy URI -- default transitive = true ***
> FAILED ***
> java.lang.RuntimeException: [unresolved dependency:
> org.apache.hive#hive-storage-api;2.7.0: not found]
> at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(
> SparkSubmit.scala:1447)
> at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(
> DependencyUtils.scala:185)
> at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(
> DependencyUtils.scala:159)
> at org.apache.spark.SparkContext.addJar(SparkContext.scala:1996)
> at org.apache.spark.SparkContext.addJar(SparkContext.scala:1928)
> at org.apache.spark.SparkContextSuite.$anonfun$new$115(SparkContextSuite.
> scala:1041)
> at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> at org.scalatest.Transformer.apply(Transformer.scala:22)
>
> Regards
> Pralabh Kumar
>
>
>


Re: [ANNOUNCE] Apache Spark 3.2.0

2021-10-19 Thread Henrik Peng
Congrats and thanks!


Gengliang Wang 于2021年10月19日 周二下午10:16写道:

> Hi all,
>
> Apache Spark 3.2.0 is the third release of the 3.x line. With tremendous
> contribution from the open-source community, this release managed to
> resolve in excess of 1,700 Jira tickets.
>
> We'd like to thank our contributors and users for their contributions and
> early feedback to this release. This release would not have been possible
> without you.
>
> To download Spark 3.2.0, head over to the download page:
> https://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-2-0.html
>


Re: Spark Session error with 30s

2021-04-12 Thread Peng Lei
Hi KhajaAsmath Mohammed
  Please check the configuration of "spark.speculation.interval", just pass
the "30" to it.

 '''
  override def start(): Unit = {

  backend.start()

  if (!isLocal && conf.get(SPECULATION_ENABLED)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(
  () => Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() },
  SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
  }
}

 '''


Sean Owen  于2021年4月13日周二 上午3:30写道:

> Something is passing this invalid 30s value, yes. Hard to say which
> property it is. I'd check if your cluster config sets anything with the
> value 30s - whatever is reading this property is not expecting it.
>
> On Mon, Apr 12, 2021, 2:25 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi Sean,
>>
>> Do you think anything that can cause this with DFS client?
>>
>> java.lang.NumberFormatException: For input string: "30s"
>> at
>> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>> at java.lang.Long.parseLong(Long.java:589)
>> at java.lang.Long.parseLong(Long.java:631)
>>
>>
>>
>> * at
>> org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1429)
>>   at
>> org.apache.hadoop.hdfs.client.impl.DfsClientConf.(DfsClientConf.java:247)
>>   at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:301)
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:285)*
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:160)
>> at
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2859)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
>> at
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2896)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2878)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:392)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:184)
>> at
>> org.apache.spark.deploy.yarn.Client$$anonfun$8.apply(Client.scala:137)
>> at
>> org.apache.spark.deploy.yarn.Client$$anonfun$8.apply(Client.scala:137)
>> at scala.Option.getOrElse(Option.scala:121)
>> at org.apache.spark.deploy.yarn.Client.(Client.scala:137)
>> at
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
>> at
>> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:183)
>> at org.apache.spark.SparkContext.(SparkContext.scala:501)
>> at
>> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
>> at
>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:936)
>> at
>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession
>>
>> Thanks,
>> Asmath
>>
>> On Mon, Apr 12, 2021 at 2:20 PM KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> I am using spark hbase connector provided by hortonwokrs. I was able to
>>> run without issues in my local environment and has this issue in emr.
>>>
>>> Thanks,
>>> Asmath
>>>
>>> On Apr 12, 2021, at 2:15 PM, Sean Owen  wrote:
>>>
>>> 
>>> Somewhere you're passing a property that expects a number, but give it
>>> "30s". Is it a time property somewhere that really just wants MS or
>>> something? But most time properties (all?) in Spark should accept that type
>>> of input anyway. Really depends on what property has a problem and what is
>>> setting it.
>>>
>>> On Mon, Apr 12, 2021 at 1:56 PM KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>>
 HI,

 I am getting weird error when running spark job in emr cluster. Same
 program runs in my local machine. Is there anything that I need to do to
 resolve this?

 21/04/12 18:48:45 ERROR SparkContext: Error initializing SparkContext.
 java.lang.NumberFormatException: For input string: "30s"

 I tried the solution mentioned in the link below but it didn't work for
 me.


 https://hadooptutorials.info/2020/10/11/part-5-using-spark-as-execution-engine-for-hive-2/

 Thanks,
 Asmath

>>>


Question about how hadoop configurations populated in driver/executor pod

2021-03-22 Thread Yue Peng
Hi,

I am trying run sparkPi example via Spark on Kubernetes in my cluster. However, 
it is consistently  failing because of executor does not have the correct 
hadoop configurations. I could fix it by pre-creating a configmap and mounting 
it into executor by specifying in pod template. But I do see in the official 
doc that hadoop configuration will be serialized to executor pods.

Did I miss anything?

Error message in executor pod:
21/03/10 07:00:01 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.io.IOException: Incomplete HDFS URI, no host: 
hdfs:///tmp/spark-examples_2.12-3.0.125067.jar
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:170)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1853)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:737)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:522)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:871)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:862)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:862)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:406)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)

More information:

https://issues.apache.org/jira/browse/SPARK-34684

Thanks,
Yue



Re: Unsubscribe

2020-12-22 Thread Wesley Peng

Bhavya Jain wrote:

Unsubscribe


please send an email to: user-unsubscr...@spark.apache.org to 
unsubscribe yourself from the list. thanks.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: unsubscribe

2020-06-27 Thread Wesley Peng
please send an empty email to: user-unsubscr...@spark.apache.org to 
unsubscribe yourself from the list.



Sri Kris wrote:
Sent from Mail  for 
Windows 10




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ML] [How-to]: How to unload the loaded W2V model in Pyspark?

2020-02-17 Thread Zhefu PENG
Hi all,

I'm using pyspark and Spark-ml to train and use Word2Vect model, here is
the logic of my program:

model = Word2VecModel.load("save path")

result_list = model.findSynonymsArray(target, top_N)

Then I use the graphframe and result_list to create graph and do some
computing. However the program failed due to the out of memory error: xxx
is running beyond physical memory limits. As a result, I want to delete the
word2vec model to free memory, since I don't need to use it after getting
the result_list.

I tried using del function in Python, and
spark.sparkContext._gateway.detach(model._java_obj)
as
https://stackoverflow.com/questions/58759929/how-to-free-the-memory-taken-by-a-pyspark-model-javamodel
suggested.
But neither two worked.

Is there anyway to unload or delete the loaded w2v model in Spark or
Pyspark?

Really appreciate for any reply and help.

Best,
Zhefu


Re: [ANNOUNCE] Announcing Apache Spark 2.4.4

2019-09-02 Thread Wesley Peng




on 2019/9/2 5:54, Dongjoon Hyun wrote:

We are happy to announce the availability of Spark 2.4.4!

Spark 2.4.4 is a maintenance release containing stability fixes. This
release is based on the branch-2.4 maintenance branch of Spark. We strongly
recommend all 2.4 users to upgrade to this stable release.


That's awesome. thanks for the work.

regards.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to work around NoOffsetForPartitionException when using Spark Streaming

2018-06-01 Thread Martin Peng
Hi,

We see below exception when using Spark Kafka streaming 0.10 on a normal
Kafka topic. Not sure why offset missing in zk, but since Spark streaming
override the offset reset policy to none in the code. I can not set the
reset policy to latest(I don't really care data loss now).

Is there any quick way to fix the missing offset or work around this?

Thanks,
Martin

1/06/2018 17:11:02: ERROR:the type of error is
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined
offset with no reset policy for partition:
elasticsearchtopicrealtimereports-97
01/06/2018 17:11:02: ERROR:Undefined offset with no reset policy for
partition: elasticsearchtopicrealtimereports-97
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:370)
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:248)
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1601)
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:184)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.immutable.List.foreach(List.scala:381)
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
scala.collection.immutable.List.map(List.scala:285)
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)

spark jdbc postgres query results don't match those of postgres query

2018-03-29 Thread Kevin Peng
I am running into a weird issue in Spark 1.6, which I was wondering if
anyone has encountered before. I am running a simple select query from
spark using a jdbc connection to postgres: val POSTGRES_DRIVER: String =
"org.postgresql.Driver" val srcSql = """select total_action_value,
last_updated from fb_fact_no_seg_20180123 where ad_id =
'23842688418150437'"" val r = sqlContext.read.format("jdbc").options(Map(
"url" -> jdbcUrl, "dbtable" -> s"($srcSql) as src" , "driver" ->
POSTGRES_DRIVER )).load().coalesce(1).cache() r.show
+--++ |total_action_value|
last_updated| +--++ |
2743.3301|2018-02-06 00:18:...| +--++
>From above you see that the result is 2743.3301, but when I run the same
query directly in postgres I get a slightly different answer: select
total_action_value, last_updated from fb_fact_no_seg_20180123 where ad_id =
'23842688418150437'; total_action_value | last_updated
+- 2743.33 | 2018-02-06 00:18:08 As
you can see from above the value is 2743.33. So why is the result coming
from spark off by .0001; basically where is .0001 coming from since in
postgres the decimal value is .33? Thanks, KP


Re: Spark Job crash due to File Not found when shuffle intermittently

2017-07-25 Thread Martin Peng
cool~ Thanks Kang! I will check and let you know.
Sorry for delay as there is an urgent customer issue today.

Best
Martin

2017-07-24 22:15 GMT-07:00 周康 <zhoukang199...@gmail.com>:

> * If the file exists but is a directory rather than a regular file, does
> * not exist but cannot be created, or cannot be opened for any other
> * reason then a FileNotFoundException is thrown.
>
> After searching into FileOutputStream i saw this annotation.So you can check 
> executor node first(may be no permission or no space,or no enough file 
> descriptor)
>
>
> 2017-07-25 13:05 GMT+08:00 周康 <zhoukang199...@gmail.com>:
>
>> You can also check whether space left in the executor node enough to
>> store shuffle file or not.
>>
>> 2017-07-25 13:01 GMT+08:00 周康 <zhoukang199...@gmail.com>:
>>
>>> First,spark will handle task fail so if job ended normally , this error
>>> can be ignore.
>>> Second, when using BypassMergeSortShuffleWriter, it will first write
>>> data file then write an index file.
>>> You can check "Failed to delete temporary index file at" or "fail to
>>> rename file" in related executor node's log file.
>>>
>>> 2017-07-25 0:33 GMT+08:00 Martin Peng <wei...@gmail.com>:
>>>
>>>> Is there anyone at share me some lights about this issue?
>>>>
>>>> Thanks
>>>> Martin
>>>>
>>>> 2017-07-21 18:58 GMT-07:00 Martin Peng <wei...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have several Spark jobs including both batch job and Stream jobs to
>>>>> process the system log and analyze them. We are using Kafka as the 
>>>>> pipeline
>>>>> to connect each jobs.
>>>>>
>>>>> Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some
>>>>> of the jobs(both batch or streaming) are thrown below exceptions
>>>>> randomly(either after several hours run or just run in 20 mins). Can 
>>>>> anyone
>>>>> give me some suggestions about how to figure out the real root cause?
>>>>> (Looks like google result is not very useful...)
>>>>>
>>>>> Thanks,
>>>>> Martin
>>>>>
>>>>> 00:30:04,510 WARN  - 17/07/22 00:30:04 WARN TaskSetManager: Lost task
>>>>> 60.0 in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0):
>>>>> java.io.FileNotFoundException: /mnt/mesos/work_dir/slaves/201
>>>>> 60924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a
>>>>> -4df9-b034-8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-
>>>>> 4f37-a106-27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-c356
>>>>> 43e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6
>>>>> (No such file or directory)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.open0(Native
>>>>> Method)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.open(
>>>>> FileOutputStream.java:270)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.>>>> >(FileOutputStream.java:213)
>>>>> 00:30:04,510 WARN  - at java.io.FileOutputStream.>>>> >(FileOutputStream.java:162)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.shuffle.Index
>>>>> ShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlo
>>>>> ckResolver.scala:144)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.shuffle.sort.
>>>>> BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWri
>>>>> ter.java:128)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.scheduler.Shu
>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:96)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.scheduler.Shu
>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:53)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.scheduler.Tas
>>>>> k.run(Task.scala:99)
>>>>> 00:30:04,510 WARN  - at org.apache.spark.executor.Exec
>>>>> utor$TaskRunner.run(Executor.scala:282)
>>>>> 00:30:04,510 WARN  - at java.util.concurrent.ThreadPoo
>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> 00:30:04,510 WARN  - at java.util.concurrent.ThreadPoo
>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> 00:30:04,510 WARN  - at java.lang.Thread.run(Thread.ja

Re: Spark Job crash due to File Not found when shuffle intermittently

2017-07-24 Thread Martin Peng
Is there anyone at share me some lights about this issue?

Thanks
Martin

2017-07-21 18:58 GMT-07:00 Martin Peng <wei...@gmail.com>:

> Hi,
>
> I have several Spark jobs including both batch job and Stream jobs to
> process the system log and analyze them. We are using Kafka as the pipeline
> to connect each jobs.
>
> Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some of
> the jobs(both batch or streaming) are thrown below exceptions
> randomly(either after several hours run or just run in 20 mins). Can anyone
> give me some suggestions about how to figure out the real root cause?
> (Looks like google result is not very useful...)
>
> Thanks,
> Martin
>
> 00:30:04,510 WARN  - 17/07/22 00:30:04 WARN TaskSetManager: Lost task 60.0
> in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0):
> java.io.FileNotFoundException: /mnt/mesos/work_dir/slaves/
> 20160924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a-4df9-b034-
> 8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-4f37-a106-
> 27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-
> c35643e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6
> (No such file or directory)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.open0(Native Method)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.open(
> FileOutputStream.java:270)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.<
> init>(FileOutputStream.java:213)
> 00:30:04,510 WARN  - at java.io.FileOutputStream.<
> init>(FileOutputStream.java:162)
> 00:30:04,510 WARN  - at org.apache.spark.shuffle.
> IndexShuffleBlockResolver.writeIndexFileAndCommit(
> IndexShuffleBlockResolver.scala:144)
> 00:30:04,510 WARN  - at org.apache.spark.shuffle.sort.
> BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:128)
> 00:30:04,510 WARN  - at org.apache.spark.scheduler.
> ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> 00:30:04,510 WARN  - at org.apache.spark.scheduler.
> ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> 00:30:04,510 WARN  - at org.apache.spark.scheduler.
> Task.run(Task.scala:99)
> 00:30:04,510 WARN  - at org.apache.spark.executor.
> Executor$TaskRunner.run(Executor.scala:282)
> 00:30:04,510 WARN  - at java.util.concurrent.
> ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 00:30:04,510 WARN  - at java.util.concurrent.
> ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 00:30:04,510 WARN  - at java.lang.Thread.run(Thread.java:748)
>
> 00:30:04,580 INFO  - Driver stacktrace:
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1435)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> abortStage$1.apply(DAGScheduler.scala:1423)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> abortStage$1.apply(DAGScheduler.scala:1422)
> 00:30:04,580 INFO  - scala.collection.mutable.
> ResizableArray$class.foreach(ResizableArray.scala:59)
> 00:30:04,580 INFO  - scala.collection.mutable.ArrayBuffer.foreach(
> ArrayBuffer.scala:48)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1422)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> 00:30:04,580 INFO  - scala.Option.foreach(Option.scala:257)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.
> handleTaskSetFailed(DAGScheduler.scala:802)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.
> DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.
> DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.
> DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
> 00:30:04,580 INFO  - org.apache.spark.util.EventLoop$$anon$1.run(
> EventLoop.scala:48)
> 00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.runJob(
> DAGScheduler.scala:628)
> 00:30:04,580 INFO  - org.apache.spark.SparkContext.
> runJob(SparkContext.scala:1918)
> 00:30:04,580 INFO  - org.apache.spark.SparkContext.
> runJob(SparkContext.scala:1931)
> 00:30:04,580 INFO  - org.apache.spark.SparkContext.
> runJob(SparkContext.scala:1944)
> 00:30:04,580 INFO  - org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.
> scala:1353)
> 00:30:04,580 INFO  - org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> 00:30:04,580 INFO  - or

Spark Job crash due to File Not found when shuffle intermittently

2017-07-21 Thread Martin Peng
Hi,

I have several Spark jobs including both batch job and Stream jobs to
process the system log and analyze them. We are using Kafka as the pipeline
to connect each jobs.

Once upgrade to Spark 2.1.0 + Spark Kafka Streaming 010, I found some of
the jobs(both batch or streaming) are thrown below exceptions
randomly(either after several hours run or just run in 20 mins). Can anyone
give me some suggestions about how to figure out the real root cause?
(Looks like google result is not very useful...)

Thanks,
Martin

00:30:04,510 WARN  - 17/07/22 00:30:04 WARN TaskSetManager: Lost task 60.0
in stage 1518490.0 (TID 338070, 10.133.96.21, executor 0):
java.io.FileNotFoundException:
/mnt/mesos/work_dir/slaves/20160924-021501-274760970-5050-7646-S2/frameworks/40aeb8e5-e82a-4df9-b034-8815a7a7564b-2543/executors/0/runs/fd15c15d-2511-4f37-a106-27431f583153/blockmgr-a0e0e673-f88b-4d12-a802-c35643e6c6b2/33/shuffle_2090_60_0.index.b66235be-79be-4455-9759-1c7ba70f91f6
(No such file or directory)
00:30:04,510 WARN  - at java.io.FileOutputStream.open0(Native Method)
00:30:04,510 WARN  - at
java.io.FileOutputStream.open(FileOutputStream.java:270)
00:30:04,510 WARN  - at
java.io.FileOutputStream.(FileOutputStream.java:213)
00:30:04,510 WARN  - at
java.io.FileOutputStream.(FileOutputStream.java:162)
00:30:04,510 WARN  - at
org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
00:30:04,510 WARN  - at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:128)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
00:30:04,510 WARN  - at
org.apache.spark.scheduler.Task.run(Task.scala:99)
00:30:04,510 WARN  - at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
00:30:04,510 WARN  - at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
00:30:04,510 WARN  - at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
00:30:04,510 WARN  - at java.lang.Thread.run(Thread.java:748)

00:30:04,580 INFO  - Driver stacktrace:
00:30:04,580 INFO  - org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
00:30:04,580 INFO  -
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
00:30:04,580 INFO  -
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
00:30:04,580 INFO  - scala.Option.foreach(Option.scala:257)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
00:30:04,580 INFO  -
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
00:30:04,580 INFO  -
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
00:30:04,580 INFO  -
org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1353)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
00:30:04,580 INFO  - org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
00:30:04,580 INFO  - org.apache.spark.rdd.RDD.take(RDD.scala:1326)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1461)
00:30:04,580 INFO  -
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
00:30:04,580 INFO  -

The stability of Spark Stream Kafka 010

2017-06-29 Thread Martin Peng
Hi,

We planned to upgrade our Spark Kafka library to 0.10 from 0.81 to simplify
our infrastructure code logic. Does anybody know when will the 010 version
become stable from experimental?
May I use this 010 version together with Spark 1.5.1?

https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

Thanks
Martin


Re: Setting Optimal Number of Spark Executor Instances

2017-03-15 Thread Kevin Peng
Mohini,

We set that parameter before we went and played with the number of
executors and that didn't seem to help at all.

Thanks,

KP

On Tue, Mar 14, 2017 at 3:37 PM, mohini kalamkar 
wrote:

> Hi,
>
> try using this parameter --conf spark.sql.shuffle.partitions=1000
>
> Thanks,
> Mohini
>
> On Tue, Mar 14, 2017 at 3:30 PM, kpeng1  wrote:
>
>> Hi All,
>>
>> I am currently on Spark 1.6 and I was doing a sql join on two tables that
>> are over 100 million rows each and I noticed that it was spawn 3+
>> tasks
>> (this is the progress meter that we are seeing show up).  We tried to
>> coalesece, repartition and shuffle partitions to drop the number of tasks
>> down because we were getting time outs due to the number of task being
>> spawned, but those operations did not seem to reduce the number of tasks.
>> The solution we came up with was actually to set the num executors to 50
>> (--num-executors=50) and it looks like it spawned 200 active tasks, but
>> the
>> total number of tasks remained the same.  Was wondering if anyone knows
>> what
>> is going on?  Is there an optimal number of executors, I was under the
>> impression that the default dynamic allocation would pick the optimal
>> number
>> of executors for us and that this situation wouldn't happen.  Is there
>> something I am missing?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Setting-Optimal-Number-of-Spark-Execut
>> or-Instances-tp28493.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Thanks & Regards,
> Mohini Kalamkar
> M: +1 310 567 9329 <(310)%20567-9329>
>


Re: udf of aggregation in pyspark dataframe ?

2016-09-29 Thread peng yu
df:  
-
a|b|c
---
1|m|n
1|x | j
2|m|x
...


import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType

def my_zip(c, d):
return dict(zip(c, d))

my_zip = F.udf(_my_zip, MapType(StingType(), StringType(), True), True)

df.groupBy('a').agg(my_zip(collect_list('c'),
collect_list('d')).alias('named_list'))



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811p27814.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: udf of aggregation in pyspark dataframe ?

2016-09-29 Thread peng yu
btw, i am using spark 1.6.1



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811p27812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



udf of aggregation in pyspark dataframe ?

2016-09-29 Thread peng yu
Hi, 

is there a way to write a udf in pyspark support agg()? 


i search all over the docs and internet, and tested it out.. some say yes,
some say no.

and when i try those yes code examples, just complaint about

AnalysisException: u"expression 'pythonUDF' is neither present in the group
by, nor is it an aggregate function. Add to group by or wrap in first() (or
first_value) if you don't care which value you get.;"



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Mike,

It looks like you are right.  The result seem to be fine.  It looks like I
messed up on the filtering clause.

sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE (s.date >= '2016-01-03' OR s.date IS NULL) AND (d.date >=
'2016-01-03' OR d.date IS NULL)").count()
res2: Long = 53042

Davies, Cesar, Gourav,

Thanks for the support.

KP

On Tue, May 3, 2016 at 11:26 AM, Michael Segel <msegel_had...@hotmail.com>
wrote:

> Silly question?
>
> If you change the predicate to
> ( s.date >= ‘2016-01-03’ OR s.date IS NULL )
> AND
> (d.date >= ‘2016-01-03’ OR d.date IS NULL)
>
> What do you get?
>
> Sorry if the syntax isn’t 100% correct. The idea is to not drop null
> values from the query.
> I would imagine that this shouldn’t kill performance since its most likely
> a post join filter on the result set?
> (Or is that just a Hive thing?)
>
> -Mike
>
> > On May 3, 2016, at 12:42 PM, Davies Liu <dav...@databricks.com> wrote:
> >
> > Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
> > '2016-01-03' is the root cause,
> > which will filter out all the nulls from outer join, will have same
> > result as inner join.
> >
> > In Spark 2.0, we turn these join into inner join actually.
> >
> > On Tue, May 3, 2016 at 9:50 AM, Cesar Flores <ces...@gmail.com> wrote:
> >> Hi
> >>
> >> Have you tried the joins without the where clause? When you use them
> you are
> >> filtering all the rows with null columns in those fields. In other
> words you
> >> are doing a inner join in all your queries.
> >>
> >> On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com>
> >> wrote:
> >>>
> >>> Hi Kevin,
> >>>
> >>> Having given it a first look I do think that you have hit something
> here
> >>> and this does not look quite fine. I have to work on the multiple AND
> >>> conditions in ON and see whether that is causing any issues.
> >>>
> >>> Regards,
> >>> Gourav Sengupta
> >>>
> >>> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng <kpe...@gmail.com> wrote:
> >>>>
> >>>> Davies,
> >>>>
> >>>> Here is the code that I am typing into the spark-shell along with the
> >>>> results (my question is at the bottom):
> >>>>
> >>>> val dps =
> >>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>>> "true").load("file:///home/ltu/dps_csv/")
> >>>> val swig =
> >>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>>> "true").load("file:///home/ltu/swig_csv/")
> >>>>
> >>>> dps.count
> >>>> res0: Long = 42694
> >>>>
> >>>> swig.count
> >>>> res1: Long = 42034
> >>>>
> >>>>
> >>>> dps.registerTempTable("dps_pin_promo_lt")
> >>>> swig.registerTempTable("swig_pin_promo_lt")
> >>>>
> >>>> sqlContext.sql("select * from dps_pin_promo_lt where date >
> >>>> '2016-01-03'").count
> >>>> res4: Long = 42666
> >>>>
> >>>> sqlContext.sql("select * from swig_pin_promo_lt where date >
> >>>> '2016-01-03'").count
> >>>> res5: Long = 34131
> >>>>
> >>>> sqlContext.sql("select distinct date, account, ad from
> dps_pin_promo_lt
> >>>> where date > '2016-01-03'").count
> >>>> res6: Long = 42533
> >>>>
> >>>> sqlContext.sql("select distinct date, account, ad from
> swig_pin_promo_lt
> >>>> where date > '2016-01-03'").count
> >>>> res7: Long = 34131
> >>>>
> >>>>
> >>>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  ,
> d.account
> >>>> AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> >>>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
> >>>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.accoun

Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Davies,

What exactly do you mean in regards to Spark 2.0 turning these join into an
inner join?  Does this mean that spark sql won't be supporting where
clauses in outer joins?


Cesar & Gourav,

When running the queries without the where clause it works as expected.  I
am pasting my results below:
val dps =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/dps_csv/")
val swig =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/swig_csv/")

dps.count
res0: Long = 42694

swig.count
res1: Long = 42034


dps.registerTempTable("dps_pin_promo_lt")
swig.registerTempTable("swig_pin_promo_lt")


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res5: Long = 60919


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res6: Long = 42034


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad)").count()
res7: Long = 42694

Thanks,

KP


On Tue, May 3, 2016 at 10:42 AM, Davies Liu <dav...@databricks.com> wrote:

> Bingo, the two predicate s.date >= '2016-01-03' AND d.date >=
> '2016-01-03' is the root cause,
>  which will filter out all the nulls from outer join, will have same
> result as inner join.
>
> In Spark 2.0, we turn these join into inner join actually.
>
> On Tue, May 3, 2016 at 9:50 AM, Cesar Flores <ces...@gmail.com> wrote:
> > Hi
> >
> > Have you tried the joins without the where clause? When you use them you
> are
> > filtering all the rows with null columns in those fields. In other words
> you
> > are doing a inner join in all your queries.
> >
> > On Tue, May 3, 2016 at 11:37 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com>
> > wrote:
> >>
> >> Hi Kevin,
> >>
> >> Having given it a first look I do think that you have hit something here
> >> and this does not look quite fine. I have to work on the multiple AND
> >> conditions in ON and see whether that is causing any issues.
> >>
> >> Regards,
> >> Gourav Sengupta
> >>
> >> On Tue, May 3, 2016 at 8:28 AM, Kevin Peng <kpe...@gmail.com> wrote:
> >>>
> >>> Davies,
> >>>
> >>> Here is the code that I am typing into the spark-shell along with the
> >>> results (my question is at the bottom):
> >>>
> >>> val dps =
> >>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>> "true").load("file:///home/ltu/dps_csv/")
> >>> val swig =
> >>> sqlContext.read.format("com.databricks.spark.csv").option("header",
> >>> "true").load("file:///home/ltu/swig_csv/")
> >>>
> >>> dps.count
> >>> res0: Long = 42694
> >>>
> >>> swig.count
> >>> res1: Long = 42034
> >>>
> >>>
> >>> dps.registerTempTable("dps_pin_promo_lt")
> >>> swig.registerTempTable("swig_pin_promo_lt")
> >>>
> >>> sqlContext.sql("select * from dps_pin_promo_lt where date >
> >>> '2016-01-03'").count
> >>> res4: Long = 42666
> >>>
> >>> sqlContext.sql("select * from swig_pin_promo_lt where date >
> >>> '2016-01-03'").count
> >>> res5: Long = 34131
> >>>
> >>> sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
> >>> where date > '2016-01-03'").count
> >>> res6: Long = 42533
> >>>
> >>> sqlContext.sql("select distinct date, account, ad from
> swig_pin_promo_lt
> >>> where date > '2016-01-03'").count
> >>> res7: Long = 34131
> >>>
> >>>
> >>> sqlContext

Re: Weird results with Spark SQL Outer joins

2016-05-03 Thread Kevin Peng
Davies,

Here is the code that I am typing into the spark-shell along with the
results (my question is at the bottom):

val dps =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/dps_csv/")
val swig =
sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("file:///home/ltu/swig_csv/")

dps.count
res0: Long = 42694

swig.count
res1: Long = 42034


dps.registerTempTable("dps_pin_promo_lt")
swig.registerTempTable("swig_pin_promo_lt")

sqlContext.sql("select * from dps_pin_promo_lt where date >
'2016-01-03'").count
res4: Long = 42666

sqlContext.sql("select * from swig_pin_promo_lt where date >
'2016-01-03'").count
res5: Long = 34131

sqlContext.sql("select distinct date, account, ad from dps_pin_promo_lt
where date > '2016-01-03'").count
res6: Long = 42533

sqlContext.sql("select distinct date, account, ad from swig_pin_promo_lt
where date > '2016-01-03'").count
res7: Long = 34131


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res9: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res10: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res11: Long = 23809


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
AS d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
= d.ad) WHERE s.date >= '2016-01-03' AND d.date >= '2016-01-03'").count()
res12: Long = 23809



>From my results above, we notice that the counts of distinct values based
on the join criteria and filter criteria for each individual table is
located at res6 and res7.  My question is why is the outer join producing
less rows than the smallest table; if there are no matches it should still
bring in that row as part of the outer join.  For the full and right outer
join I am expecting to see a minimum of res6 rows, but I get less, is there
something specific that I am missing here?  I am expecting that the full
outer join would give me the union of the two table sets so I am expecting
at least 42533 rows not 23809.


Gourav,

I just ran this result set on a new session with slightly newer data...
still seeing those results.



Thanks,

KP


On Mon, May 2, 2016 at 11:16 PM, Davies Liu <dav...@databricks.com> wrote:

> as @Gourav said, all the join with different join type show the same
> results,
> which meant that all the rows from left could match at least one row from
> right,
> all the rows from right could match at least one row from left, even
> the number of row from left does not equal that of right.
>
> This is correct result.
>
> On Mon, May 2, 2016 at 2:13 PM, Kevin Peng <kpe...@gmail.com> wrote:
> > Yong,
> >
> > Sorry, let explain my deduction; it is going be difficult to get a sample
> > data out since the dataset I am using is proprietary.
> >
> > From the above set queries (ones mentioned in above comments) both inner
> and
> > outer join are producing the same counts.  They are basically pulling out
> > selected columns from the query, but there is no roll up happening or
> > anything that would possible make it suspicious that there is any
> difference
> > besides the type of joins.  The tables are matched based on three keys
> that
> > are present in both tables (ad, account, and date), on top of this they
> are
> > filtered by date being above 2016-01-03.  Since all the joins are
> producing
> > the same counts, the natural suspicions is that the tables are identical,
> > but I when I run the following two queries:
> >
> > scala> sqlContext.sql("select * from swig_pin_prom

Re: Weird results with Spark SQL Outer joins

2016-05-02 Thread Kevin Peng
Yong,

Sorry, let explain my deduction; it is going be difficult to get a sample
data out since the dataset I am using is proprietary.

>From the above set queries (ones mentioned in above comments) both inner
and outer join are producing the same counts.  They are basically pulling
out selected columns from the query, but there is no roll up happening or
anything that would possible make it suspicious that there is any
difference besides the type of joins.  The tables are matched based on
three keys that are present in both tables (ad, account, and date), on top
of this they are filtered by date being above 2016-01-03.  Since all the
joins are producing the same counts, the natural suspicions is that the
tables are identical, but I when I run the following two queries:

scala> sqlContext.sql("select * from swig_pin_promo_lt where date
>='2016-01-03'").count

res14: Long = 34158

scala> sqlContext.sql("select * from dps_pin_promo_lt where date
>='2016-01-03'").count

res15: Long = 42693


The above two queries filter out the data based on date used by the joins
of 2016-01-03 and you can see the row count between the two tables are
different, which is why I am suspecting something is wrong with the outer
joins in spark sql, because in this situation the right and outer joins may
produce the same results, but it should not be equal to the left join and
definitely not the inner join; unless I am missing something.


Side note: In my haste response above I posted the wrong counts for
dps.count, the real value is res16: Long = 42694


Thanks,


KP



On Mon, May 2, 2016 at 12:50 PM, Yong Zhang  wrote:

> We are still not sure what is the problem, if you cannot show us with some
> example data.
>
> For dps with 42632 rows, and swig with 42034 rows, if dps full outer join
> with swig on 3 columns; with additional filters, get the same resultSet row
> count as dps lefter outer join with swig on 3 columns, with additional
> filters, again get the the same resultSet row count as dps right outer join
> with swig on 3 columns, with same additional filters.
>
> Without knowing your data, I cannot see the reason that has to be a bug in
> the spark.
>
> Am I misunderstanding your bug?
>
> Yong
>
> --
> From: kpe...@gmail.com
> Date: Mon, 2 May 2016 12:11:18 -0700
> Subject: Re: Weird results with Spark SQL Outer joins
> To: gourav.sengu...@gmail.com
> CC: user@spark.apache.org
>
>
> Gourav,
>
> I wish that was case, but I have done a select count on each of the two
> tables individually and they return back different number of rows:
>
>
> dps.registerTempTable("dps_pin_promo_lt")
> swig.registerTempTable("swig_pin_promo_lt")
>
>
> dps.count()
> RESULT: 42632
>
>
> swig.count()
> RESULT: 42034
>
> On Mon, May 2, 2016 at 11:55 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
> This shows that both the tables have matching records and no mismatches.
> Therefore obviously you have the same results irrespective of whether you
> use right or left join.
>
> I think that there is no problem here, unless I am missing something.
>
> Regards,
> Gourav
>
> On Mon, May 2, 2016 at 7:48 PM, kpeng1  wrote:
>
> Also, the results of the inner query produced the same results:
> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
> AS
> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad
> =
> d.ad) WHERE s.date >= '2016-01-03'AND d.date >= '2016-01-03'").count()
> RESULT:23747
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861p26863.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


Re: Weird results with Spark SQL Outer joins

2016-05-02 Thread Kevin Peng
Gourav,

I wish that was case, but I have done a select count on each of the two
tables individually and they return back different number of rows:


dps.registerTempTable("dps_pin_promo_lt")

swig.registerTempTable("swig_pin_promo_lt")


dps.count()

RESULT: 42632


swig.count()

RESULT: 42034

On Mon, May 2, 2016 at 11:55 AM, Gourav Sengupta 
wrote:

> This shows that both the tables have matching records and no mismatches.
> Therefore obviously you have the same results irrespective of whether you
> use right or left join.
>
> I think that there is no problem here, unless I am missing something.
>
> Regards,
> Gourav
>
> On Mon, May 2, 2016 at 7:48 PM, kpeng1  wrote:
>
>> Also, the results of the inner query produced the same results:
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT:23747
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861p26863.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Weird results with Spark SQL Outer joins

2016-05-02 Thread Kevin Peng
Gourav,

Apologies.  I edited my post with this information:
Spark version: 1.6
Result from spark shell
OS: Linux version 2.6.32-431.20.3.el6.x86_64 (
mockbu...@c6b9.bsys.dev.centos.org) (gcc version 4.4.7 20120313 (Red Hat
4.4.7-4) (GCC) ) #1 SMP Thu Jun 19 21:14:45 UTC 2014

Thanks,

KP

On Mon, May 2, 2016 at 11:05 AM, Gourav Sengupta 
wrote:

> Hi,
>
> As always, can you please write down details regarding your SPARK cluster
> - the version, OS, IDE used, etc?
>
> Regards,
> Gourav Sengupta
>
> On Mon, May 2, 2016 at 5:58 PM, kpeng1  wrote:
>
>> Hi All,
>>
>> I am running into a weird result with Spark SQL Outer joins.  The results
>> for all of them seem to be the same, which does not make sense due to the
>> data.  Here are the queries that I am running with the results:
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT:23747
>>
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT:23747
>>
>> sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account
>> AS
>> d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
>> d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
>> dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND
>> s.ad =
>> d.ad) WHERE s.date >= '2016-01-03'AND d.date >=
>> '2016-01-03'").count()
>> RESULT: 23747
>>
>> Was wondering if someone had encountered this issues before.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: println not appearing in libraries when running job using spark-submit --master local

2016-03-28 Thread Kevin Peng
Ted,

What triggerAndWait does is perform a rest call to a specified url and then
waits until the status message that gets returned by that url in a json a
field says complete.  The issues is I put a println at the very top of the
method and that doesn't get printed out, and I know that println isn't
causing an issues because there is an exception that I throw further down
the line and that exception is what I am currently getting, but none of the
println along the way are showing:


  def triggerAndWait(url: String, pollInterval: Int = 1000 * 30,

timeOut: Int = 1000 * 60 * 60, connectTimeout: Int = 3,

readTimeout: Int = 3, requestMethod: String = "GET"): Boolean = {

println("Entering triggerAndWait function - url: " + url +

  " pollInterval: " + pollInterval.toString() + " timeOut: " +

  timeOut.toString() + " connectionTimeout: " +

  connectTimeout.toString() + " readTimeout: " + readTimeout.toString()
+

  " requestMethod: " + requestMethod)


.


Thanks,


KP

On Mon, Mar 28, 2016 at 1:52 PM, Ted Yu  wrote:

> Can you describe what gets triggered by triggerAndWait ?
>
> Cheers
>
> On Mon, Mar 28, 2016 at 1:39 PM, kpeng1  wrote:
>
>> Hi All,
>>
>> I am currently trying to debug a spark application written in scala.  I
>> have
>> a main method:
>>   def main(args: Array[String]) {
>> ...
>>  SocialUtil.triggerAndWait(triggerUrl)
>> ...
>>
>> The SocialUtil object is included in a seperate jar.  I launched the
>> spark-submit command using --jars passing the SocialUtil jar.  Inside the
>> triggerAndWait function I have a println statement that is the first thing
>> in the method, but it doesn't seem to be coming out.  All println that
>> happen inside the main function directly are appearing though.  I was
>> wondering if anyone knows what is going on in this situation and how I can
>> go about making the println in the SocialUtil object appear.
>>
>> Thanks,
>>
>> KP
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/println-not-appearing-in-libraries-when-running-job-using-spark-submit-master-local-tp26617.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


ClassCastException when saving a DataFrame to parquet file (saveAsParquetFile, Spark 1.3.1) using Scala

2015-08-21 Thread Emma Boya Peng
Hi,

I was trying to programmatically specify a schema and apply it to a RDD of
Rows and save the resulting DataFrame as a parquet file.
Here's what I did:

1. Created an RDD of Rows from RDD[Array[String]]:
val gameId= Long.valueOf(line(0))
  val accountType = Long.valueOf(line(1))
  val worldId = Long.valueOf(line(2))
  val dtEventTime = line(3)
  val iEventId = line(4)
  val vVersionId = line(5)
  val vUin = line(6)
  val vClientIp = line(7)
  val vZoneId = line(8)
  val dtCreateTime = line(9)
  val iFeeFlag = Long.valueOf(line(10))
  val vLoginWay = line(11)

  return Row(gameId, accountType, worldId, dtEventTime, iEventId,
vVersionId, vUin, vClientIp,
 vZoneId, dtCreateTime, vZoneId, dtCreateTime, iFeeFlag,
vLoginWay)


ClassCastException when saving a DataFrame to parquet file (saveAsParquetFile, Spark 1.3.1) using Scala

2015-08-21 Thread Emma Boya Peng
Hi,

I was trying to programmatically specify a schema and apply it to a RDD of
Rows and save the resulting DataFrame as a parquet file, but I got
java.lang.ClassCastException:
java.lang.String cannot be cast to java.lang.Long on the last step.

Here's what I did:

1. Created an RDD of Rows from RDD[Array[String]]:
val gameId= Long.valueOf(line(0))
  val accountType = Long.valueOf(line(1))
  val worldId = Long.valueOf(line(2))
  val dtEventTime = line(3)
  val iEventId = line(4)

  return Row(gameId, accountType, worldId, dtEventTime, iEventId)

2. Generate the schema:
 return StructType(Array(StructField(GameId, LongType, true),
StructField(AccountType, LongType, true), StructField(WorldId,
LongType, true), StructField(dtEventTime, StringType, true),
StructField(iEventId,
StringType, true)))

3. Apply the schema and apply it to the RDD of Rows:
val schemaRdd = sqlContext.createDataFrame(rowRdd, schema)

4. Save schemaRdd as a parquet file:
 schemaRdd.saveAsParquetFile(dst + / + tableName + .parquet)

However, it gave me a ClassCastException on step 4 (the DataFrame, i.e.
schemaRdd, can be correctly printed out according to the specified schema).

Thank you for your help!

Best,
Emma

Stack trace of the exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage
1.0 (TID 12, 10-4-28-24): java.lang.ClassCastException: java.lang.String
cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at
org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:88)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:357)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:338)
at
org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:324)
at
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
at org.apache.spark.sql.parquet.ParquetRelation2.org
$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
Hi Andrew,

Thanks a lot! Indeed, it doesn't start with spark, the following properties
are read by implementation of the driver rather than spark conf:

--conf spooky.root=s3n://spooky- \
--conf spooky.checkpoint=s3://spooky-checkpoint \

This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to set
the same properties?

Yours Peng

On 12 June 2015 at 14:20, Andrew Or and...@databricks.com wrote:

 Hi Peng,

 Setting properties through --conf should still work in Spark 1.4. From the
 warning it looks like the config you are trying to set does not start with
 the prefix spark.. What is the config that you are trying to set?

 -Andrew

 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au:

 In Spark 1.3.x, the system property of the driver can be set by --conf
 option, shared between setting spark properties and system properties.

 In Spark 1.4.0 this feature is removed, the driver instead log the
 following
 warning:

 Warning: Ignoring non-spark config property: xxx.xxx=v

 How do set driver's system property in 1.4.0? Is there a reason it is
 removed without a deprecation warning?

 Thanks a lot for your advices.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: [Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
Thanks all for your information. Andrew, I dig out one of your old post
which is relevant:

http://apache-spark-user-list.1001560.n3.nabble.com/little-confused-about-SPARK-JAVA-OPTS-alternatives-td5798.html

But didn't mention how to supply the properties that don't start with spark.

On 12 June 2015 at 19:39, Ted Yu yuzhih...@gmail.com wrote:

 This is the SPARK JIRA which introduced the warning:

 [SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties
 in spark-shell and spark-submit

 On Fri, Jun 12, 2015 at 4:34 PM, Peng Cheng rhw...@gmail.com wrote:

 Hi Andrew,

 Thanks a lot! Indeed, it doesn't start with spark, the following
 properties are read by implementation of the driver rather than spark conf:

 --conf spooky.root=s3n://spooky- \
 --conf spooky.checkpoint=s3://spooky-checkpoint \

 This used to work from Spark 1.0.0 to 1.3.1. Do you know the new way to
 set the same properties?

 Yours Peng

 On 12 June 2015 at 14:20, Andrew Or and...@databricks.com wrote:

 Hi Peng,

 Setting properties through --conf should still work in Spark 1.4. From
 the warning it looks like the config you are trying to set does not start
 with the prefix spark.. What is the config that you are trying to set?

 -Andrew

 2015-06-12 11:17 GMT-07:00 Peng Cheng pc...@uow.edu.au:

 In Spark 1.3.x, the system property of the driver can be set by --conf
 option, shared between setting spark properties and system properties.

 In Spark 1.4.0 this feature is removed, the driver instead log the
 following
 warning:

 Warning: Ignoring non-spark config property: xxx.xxx=v

 How do set driver's system property in 1.4.0? Is there a reason it is
 removed without a deprecation warning?

 Thanks a lot for your advices.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







[Spark 1.4.0]How to set driver's system property using spark-submit options?

2015-06-12 Thread Peng Cheng
In Spark 1.3.x, the system property of the driver can be set by --conf
option, shared between setting spark properties and system properties.

In Spark 1.4.0 this feature is removed, the driver instead log the following
warning:

Warning: Ignoring non-spark config property: xxx.xxx=v

How do set driver's system property in 1.4.0? Is there a reason it is
removed without a deprecation warning?

Thanks a lot for your advices.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-How-to-set-driver-s-system-property-using-spark-submit-options-tp23298.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: S3NativeFileSystem inefficient implementation when calling sc.textFile

2015-05-21 Thread Peng Cheng
I stumble upon this thread and I conjecture that this may affect restoring a
checkpointed RDD as well:

http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-gt-10-hour-between-stage-latency-td22925.html#a22928

In my case I have 1600+ fragmented checkpoint file and the time to read all
metadata takes a staggering 11 hours.

If this is really the cause then its an obvious handicap, as checkponted RDD
already has all file parttition information available and doesn't need to to
read them from s3 into driver again (which cause a single-point-of-failure
and a bottleneck).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3NativeFileSystem-inefficient-implementation-when-calling-sc-textFile-tp19841p22984.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
Looks like this problem has been mentioned before:

http://qnalist.com/questions/5666463/downloads-from-s3-exceedingly-slow-when-running-on-spark-ec2

and a temporarily solution is to deploy on a dedicated EMR/S3 configuration.
I'll go for that one for a shot.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
Turns out the above thread is unrelated: it was caused by using s3:// instead
of s3n://. Which I already avoided in my checkpointDir configuration.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long ( 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
BTW: My thread dump of the driver's main thread looks like it is stuck on
waiting for Amazon S3 bucket metadata for a long time (which may suggests
that I should move checkpointing directory from S3 to HDFS):

Thread 1: main (RUNNABLE) 
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
sun.security.ssl.InputRecord.read(InputRecord.java:480)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891)
sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260)
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251)
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:223)
org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271)
org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:685)
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:487)
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:172)
sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22926.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What are the likely causes of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle?

2015-04-24 Thread Peng Cheng
I'm deploying a Spark data processing job on an EC2 cluster, the job is small
for the cluster (16 cores with 120G RAM in total), the largest RDD has only
76k+ rows. But heavily skewed in the middle (thus requires repartitioning)
and each row has around 100k of data after serialization. The job always got
stuck in repartitioning. Namely, the job will constantly get following
errors and retries:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle

org.apache.spark.shuffle.FetchFailedException: Error in opening
FileSegmentManagedBuffer

org.apache.spark.shuffle.FetchFailedException:
java.io.FileNotFoundException: /tmp/spark-...
I've tried to identify the problem but it seems like both memory and disk
consumption of the machine throwing these errors are below 50%. I've also
tried different configurations, including:

let driver/executor memory use 60% of total memory.
let netty to priortize JVM shuffling buffer.
increase shuffling streaming buffer to 128m.
use KryoSerializer and max out all buffers
increase shuffling memoryFraction to 0.4
But none of them works. The small job always trigger the same series of
errors and max out retries (upt to 1000 times). How to troubleshoot this
thing in such situation?

Thanks a lot if you have any clue.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-are-the-likely-causes-of-org-apache-spark-shuffle-MetadataFetchFailedException-Missing-an-outpu-tp22646.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Performance on Yarn

2015-04-20 Thread Peng Cheng
I got exactly the same problem, except that I'm running on a standalone
master. Can you tell me the counterpart parameter on standalone master for
increasing the same memroy overhead?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22580.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to avoid “Invalid checkpoint directory” error in apache Spark?

2015-04-17 Thread Peng Cheng
I'm using Amazon EMR + S3 as my spark cluster infrastructure. When I'm
running a job with periodic checkpointing (it has a long dependency tree, so
truncating by checkpointing is mandatory, each checkpoint has 320
partitions). The job stops halfway, resulting an exception:

(On driver)
org.apache.spark.SparkException: Invalid checkpoint directory:
s3n://spooky-checkpoint/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198
at
org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
...

(On Executor)
15/04/17 22:00:14 WARN StorageService: Encountered 4 Internal Server
error(s), will retry in 800ms
15/04/17 22:00:15 WARN RestStorageService: Retrying request following error
response: PUT '/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198/part-00025' --
ResponseCode: 500, ResponseStatus: Internal Server Error
...

After manually checking checkpointed files I found that
/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198/part-00025 is indeed missing
on S3. So my question is: if it is missing (perhaps due to AWS malfunction),
why didn't spark detect it immediately in the checkpointing process (so it
can be retried), instead of throwing an irrecoverable error stating that
dependency tree is already lost? And how to avoid this situation from
happening again?

I don't think this problem is addressed before because HDFS is assumed to be
immediately consistent (unlike S3 which is eventually consistent) and
extremely resilient. However every component has a chance of breakdown, can
you share your best practice of checkpointing?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-Invalid-checkpoint-directory-error-in-apache-Spark-tp22548.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark there is no space on the disk

2015-03-31 Thread Peng Xia
Yes, we have just modified the configuration, and every thing works fine.
Thanks very much for the help.

On Thu, Mar 19, 2015 at 5:24 PM, Ted Yu yuzhih...@gmail.com wrote:

 For YARN, possibly this one ?

 property
   nameyarn.nodemanager.local-dirs/name
   value/hadoop/yarn/local/value
 /property

 Cheers

 On Thu, Mar 19, 2015 at 2:21 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 IIRC you have to set that configuration on the Worker processes (for
 standalone). The app can't override it (only for a client-mode
 driver). YARN has a similar configuration, but I don't know the name
 (shouldn't be hard to find, though).

 On Thu, Mar 19, 2015 at 11:56 AM, Davies Liu dav...@databricks.com
 wrote:
  Is it possible that `spark.local.dir` is overriden by others? The docs
 say:
 
  NOTE: In Spark 1.0 and later this will be overriden by
  SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN)
 
  On Sat, Mar 14, 2015 at 5:29 PM, Peng Xia sparkpeng...@gmail.com
 wrote:
  Hi Sean,
 
  Thank very much for your reply.
  I tried to config it from below code:
 
  sf = SparkConf().setAppName(test).set(spark.executor.memory,
  45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp)
 
  But still get the error.
  Do you know how I can config this?
 
 
  Thanks,
  Best,
  Peng
 
 
  On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote:
 
  It means pretty much what it says. You ran out of space on an executor
  (not driver), because the dir used for serialization temp files is
  full (not all volumes). Set spark.local.dirs to something more
  appropriate and larger.
 
  On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com
 wrote:
   Hi
  
  
   I was running a logistic regression algorithm on a 8 nodes spark
   cluster,
   each node has 8 cores and 56 GB Ram (each node is running a windows
   system).
   And the spark installation driver has 1.9 TB capacity. The dataset
 I was
   training on are has around 40 million records with around 6600
 features.
   But
   I always get this error during the training process:
  
   Py4JJavaError: An error occurred while calling
   o70.trainLogisticRegressionModelWithLBFGS.
   : org.apache.spark.SparkException: Job aborted due to stage failure:
   Task
   2709 in stage 3.0 failed 4 times, most recent failure: Lost task
 2709.3
   in
   stage 3.0 (TID 2766,
   workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
   java.io.IOException: There is not enough space on the disk
   at java.io.FileOutputStream.writeBytes(Native Method)
   at java.io.FileOutputStream.write(FileOutputStream.java:345)
   at
   java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
   at
  
  
 org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
   at
  
  
 org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
   at
  
 org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
   at
  
  
 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
   at
  
  
 java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
   at
  
  
 java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
   at
   java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
   at
  
  
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
   at
  
  
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
   at
  
  
 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
   at
   org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
   at
   org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
   at
  
  
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
   at
  
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
   at
   org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
   at
   org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
   at
   org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
   at
   org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at
  
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
   at
  
  
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at
  
  
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
  
   Driver stacktrace

Re: refer to dictionary

2015-03-31 Thread Peng Xia
Hi Ted,

Thanks very much, yea, using broadcast is much faster.

Best,
Peng

On Tue, Mar 31, 2015 at 8:49 AM, Ted Yu yuzhih...@gmail.com wrote:

 You can use broadcast variable.

 See also this thread:

 http://search-hadoop.com/m/JW1q5GX7U22/Spark+broadcast+variablesubj=How+Broadcast+variable+scale+



  On Mar 31, 2015, at 4:43 AM, Peng Xia sparkpeng...@gmail.com wrote:
 
  Hi,
 
  I have a RDD (rdd1)where each line is split into an array [a, b,
 c], etc.
  And I also have a local dictionary p (dict1) stores key value pair
 {a:1, b: 2, c:3}
  I want to replace the keys in the rdd with the its corresponding value
 in the dict:
  rdd1.map(lambda line: [dict1[item] for item in line])
 
  But this task is not distributed, I believe the reason is the dict1 is a
 local instance.
  Can any one provide suggestions on this to parallelize this?
 
 
  Thanks,
  Best,
  Peng
 



Re: Can I start multiple executors in local mode?

2015-03-16 Thread xu Peng
Hi David,

You can try the local-cluster.

the number in local-cluster[2,2,1024] represents that there are 2 worker, 2
cores and 1024M

Best Regards

Peng Xu

2015-03-16 19:46 GMT+08:00 Xi Shen davidshe...@gmail.com:

 Hi,

 In YARN mode you can specify the number of executors. I wonder if we can
 also start multiple executors at local, just to make the test run faster.

 Thanks,
 David



Re: spark there is no space on the disk

2015-03-14 Thread Peng Xia
Hi Sean,

Thank very much for your reply.
I tried to config it from below code:

sf = SparkConf().setAppName(test).set(spark.executor.memory,
45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp)

But still get the error.
Do you know how I can config this?


Thanks,
Best,
Peng


On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote:

 It means pretty much what it says. You ran out of space on an executor
 (not driver), because the dir used for serialization temp files is
 full (not all volumes). Set spark.local.dirs to something more
 appropriate and larger.

 On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote:
  Hi
 
 
  I was running a logistic regression algorithm on a 8 nodes spark cluster,
  each node has 8 cores and 56 GB Ram (each node is running a windows
 system).
  And the spark installation driver has 1.9 TB capacity. The dataset I was
  training on are has around 40 million records with around 6600 features.
 But
  I always get this error during the training process:
 
  Py4JJavaError: An error occurred while calling
  o70.trainLogisticRegressionModelWithLBFGS.
  : org.apache.spark.SparkException: Job aborted due to stage failure: Task
  2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3
 in
  stage 3.0 (TID 2766,
  workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
  java.io.IOException: There is not enough space on the disk
  at java.io.FileOutputStream.writeBytes(Native Method)
  at java.io.FileOutputStream.write(FileOutputStream.java:345)
  at
 java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
  at
 
 org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
  at
 
 org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
  at
  org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
  at
 
 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
  at
 
 java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
  at
 
 java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
  at
  java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
  at
 
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
  at
 
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
  at
 
 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
  at
  org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
  at
  org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
  at
  org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
  at
  org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
  at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
  at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
  at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
  at
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
  at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
 
  Driver stacktrace:
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
  at
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
  at scala.Option.foreach(Option.scala:236)
  at
 
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
  at
 
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
  at akka.actor.Actor$class.aroundReceive

Re: spark there is no space on the disk

2015-03-14 Thread Peng Xia
And I have 2 TB free space on C driver.

On Sat, Mar 14, 2015 at 8:29 PM, Peng Xia sparkpeng...@gmail.com wrote:

 Hi Sean,

 Thank very much for your reply.
 I tried to config it from below code:

 sf = SparkConf().setAppName(test).set(spark.executor.memory, 
 45g).set(spark.cores.max, 62),set(spark.local.dir, C:\\tmp)

 But still get the error.
 Do you know how I can config this?


 Thanks,
 Best,
 Peng


 On Sat, Mar 14, 2015 at 3:41 AM, Sean Owen so...@cloudera.com wrote:

 It means pretty much what it says. You ran out of space on an executor
 (not driver), because the dir used for serialization temp files is
 full (not all volumes). Set spark.local.dirs to something more
 appropriate and larger.

 On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia sparkpeng...@gmail.com wrote:
  Hi
 
 
  I was running a logistic regression algorithm on a 8 nodes spark
 cluster,
  each node has 8 cores and 56 GB Ram (each node is running a windows
 system).
  And the spark installation driver has 1.9 TB capacity. The dataset I was
  training on are has around 40 million records with around 6600
 features. But
  I always get this error during the training process:
 
  Py4JJavaError: An error occurred while calling
  o70.trainLogisticRegressionModelWithLBFGS.
  : org.apache.spark.SparkException: Job aborted due to stage failure:
 Task
  2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3
 in
  stage 3.0 (TID 2766,
  workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
  java.io.IOException: There is not enough space on the disk
  at java.io.FileOutputStream.writeBytes(Native Method)
  at java.io.FileOutputStream.write(FileOutputStream.java:345)
  at
 java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
  at
 
 org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
  at
 
 org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
  at
  org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
  at
 
 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
  at
 
 java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
  at
 
 java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
  at
  java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
  at
 
 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
  at
 
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
  at
 
 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
  at
  org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
  at
  org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
  at
 
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
  at
  org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
  at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
  at
 org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
  at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
  at
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
  at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
 
  Driver stacktrace:
  at
  org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
  at
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
  at
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
  at scala.Option.foreach(Option.scala:236)
  at
 
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696

Re: spark sql writing in avro

2015-03-13 Thread Kevin Peng
Markus,

Thanks.  That makes sense.  I was able to get this to work with spark-shell
passing in the git built jar.  I did notice that I couldn't get
AvroSaver.save to work with SQLContext, but it works with HiveContext.  Not
sure if that is an issue, but for me, it is fine.

Once again, thanks for the help.

Kevin

On Fri, Mar 13, 2015 at 1:57 PM, M. Dale medal...@yahoo.com wrote:

 I probably did not do a good enough job explaining the problem. If you
 used Maven with the
 default Maven repository you have an old version of spark-avro that does
 not contain AvroSaver and does not have the saveAsAvro method implemented:

 Assuming you use the default Maven repo location:
 cd ~/.m2/repository/com/databricks/spark-avro_2.10/0.1
 jar tvf spark-avro_2.10-0.1.jar | grep AvroSaver

 Comes up empty. The jar file does not contain this class because
 AvroSaver.scala wasn't added until Jan 21. The jar file is from 14 November.

 So:
 git clone g...@github.com:databricks/spark-avro.git
 cd spark-avro
 sbt publish-m2

 This publishes the latest master code (this includes AvroSaver etc.) to
 your local Maven repo and Maven will pick up the latest version of
 spark-avro (for this machine).

 Now you should be able to compile and run.

 HTH,
 Markus


 On 03/12/2015 11:55 PM, Kevin Peng wrote:

 Dale,

  I basically have the same maven dependency above, but my code will not
 compile due to not being able to reference to AvroSaver, though the
 saveAsAvro reference compiles fine, which is weird.  Eventhough saveAsAvro
 compiles for me, it errors out when running the spark job due to it not
 being implemented (the job quits and says non implemented method or
 something along those lines).

  I will try going the spark shell and passing in the jar built from
 github since I haven't tried that quite yet.

 On Thu, Mar 12, 2015 at 6:44 PM, M. Dale medal...@yahoo.com wrote:

 Short answer: if you downloaded spark-avro from the repo.maven.apache.org
 repo you might be using an old version (pre-November 14, 2014) -
 see timestamps at
 http://repo.maven.apache.org/maven2/com/databricks/spark-avro_2.10/0.1/
 Lots of changes at https://github.com/databricks/spark-avro since then.

 Databricks, thank you for sharing the Avro code!!!

 Could you please push out the latest version or update the version
 number and republish to repo.maven.apache.org (I have no idea how jars
 get
 there). Or is there a different repository that users should point to for
 this artifact?

 Workaround: Download from https://github.com/databricks/spark-avro and
 build
 with latest functionality (still version 0.1) and add to your local Maven
 or Ivy repo.

 Long version:
 I used a default Maven build and declared my dependency on:

 dependency
 groupIdcom.databricks/groupId
 artifactIdspark-avro_2.10/artifactId
 version0.1/version
 /dependency

 Maven downloaded the 0.1 version from
 http://repo.maven.apache.org/maven2/com/databricks/spark-avro_2.10/0.1/
 and included it in my app code jar.

 From spark-shell:

 import com.databricks.spark.avro._
 import org.apache.spark.sql.SQLContext
 val sqlContext = new SQLContext(sc)

 # This schema includes LONG for time in millis (
 https://github.com/medale/spark-mail/blob/master/mailrecord/src/main/avro/com/uebercomputing/mailrecord/MailRecord.avdl
 )
 val recordsSchema = sqlContext.avroFile(/opt/rpm1/enron/enron-tiny.avro)
 java.lang.RuntimeException: Unsupported type LONG

 However, checking out the spark-avro code from its GitHub repo and adding
 a test case against the MailRecord avro everything ran fine.

 So I built the databricks spark-avro locally on my box and then put it in
 my
 local Maven repo - everything worked from spark-shell when adding that jar
 as dependency.

 Hope this helps for the save case as well. On the pre-14NOV version,
 avro.scala
 says:
  // TODO: Implement me.
   implicit class AvroSchemaRDD(schemaRDD: SchemaRDD) {
 def saveAsAvroFile(path: String): Unit = ???
   }

 Markus

 On 03/12/2015 07:05 PM, kpeng1 wrote:

 Hi All,

 I am current trying to write out a scheme RDD to avro.  I noticed that
 there
 is a databricks spark-avro library and I have included that in my
 dependencies, but it looks like I am not able to access the AvroSaver
 object.  On compilation of the job I get this:
 error: not found: value AvroSaver
 [ERROR] AvroSaver.save(resultRDD, args(4))

 I also tried calling saveAsAvro on the resultRDD(the actual rdd with the
 results) and that passes compilation, but when I run the code I get an
 error
 that says the saveAsAvro is not implemented.  I am using version 0.1 of
 spark-avro_2.10




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-writing-in-avro-tp22021.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr

spark there is no space on the disk

2015-03-13 Thread Peng Xia
Hi


I was running a logistic regression algorithm on a 8 nodes spark cluster,
each node has 8 cores and 56 GB Ram (each node is running a windows
system). And the spark installation driver has 1.9 TB capacity. The dataset
I was training on are has around 40 million records with around 6600
features. But I always get this error during the training process:

Py4JJavaError: An error occurred while calling
o70.trainLogisticRegressionModelWithLBFGS.:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 2709 in stage 3.0 failed 4 times, most recent failure: Lost task
2709.3 in stage 3.0 (TID 2766,
workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
java.io.IOException: There is not enough space on the disk
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at 
org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
at 
org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
at 
org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
at 
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
at 
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Re: Loading in json with spark sql

2015-03-13 Thread Kevin Peng
Yin,

Yup thanks.  I fixed that shortly after I posted and it worked.

Thanks,

Kevin

On Fri, Mar 13, 2015 at 8:28 PM, Yin Huai yh...@databricks.com wrote:

 Seems you want to use array for the field of providers, like 
 providers:[{id:
 ...}, {id:...}] instead of providers:{{id: ...}, {id:...}}

 On Fri, Mar 13, 2015 at 7:45 PM, kpeng1 kpe...@gmail.com wrote:

 Hi All,

 I was noodling around with loading in a json file into spark sql's hive
 context and I noticed that I get the following message after loading in
 the
 json file:
 PhysicalRDD [_corrupt_record#0], MappedRDD[5] at map at JsonRDD.scala:47

 I am using the HiveContext to load in the json file using the jsonFile
 command.  I also have 1 json object per line on the file.  Here is a
 sample
 of the contents in the json file:

 {user_id:7070,providers:{{id:8753,name:pjfig,behaviors:{b1:erwxt,b2:yjooj}},{id:8329,name:dfvhh,behaviors:{b1:pjjdn,b2:ooqsh

 {user_id:1615,providers:{{id:6105,name:rsfon,behaviors:{b1:whlje,b2:lpjnq}},{id:6828,name:pnmrb,behaviors:{b1:fjpmz,b2:dxqxk

 {user_id:5210,providers:{{id:9360,name:xdylm,behaviors:{b1:gcdze,b2:cndcs}},{id:4812,name:gxboh,behaviors:{b1:qsxao,b2:ixdzq




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Loading-in-json-with-spark-sql-tp22044.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: spark sql writing in avro

2015-03-12 Thread Kevin Peng
Dale,

I basically have the same maven dependency above, but my code will not
compile due to not being able to reference to AvroSaver, though the
saveAsAvro reference compiles fine, which is weird.  Eventhough saveAsAvro
compiles for me, it errors out when running the spark job due to it not
being implemented (the job quits and says non implemented method or
something along those lines).

I will try going the spark shell and passing in the jar built from github
since I haven't tried that quite yet.

On Thu, Mar 12, 2015 at 6:44 PM, M. Dale medal...@yahoo.com wrote:

 Short answer: if you downloaded spark-avro from the repo.maven.apache.org
 repo you might be using an old version (pre-November 14, 2014) -
 see timestamps at http://repo.maven.apache.org/
 maven2/com/databricks/spark-avro_2.10/0.1/
 Lots of changes at https://github.com/databricks/spark-avro since then.

 Databricks, thank you for sharing the Avro code!!!

 Could you please push out the latest version or update the version
 number and republish to repo.maven.apache.org (I have no idea how jars get
 there). Or is there a different repository that users should point to for
 this artifact?

 Workaround: Download from https://github.com/databricks/spark-avro and
 build
 with latest functionality (still version 0.1) and add to your local Maven
 or Ivy repo.

 Long version:
 I used a default Maven build and declared my dependency on:

 dependency
 groupIdcom.databricks/groupId
 artifactIdspark-avro_2.10/artifactId
 version0.1/version
 /dependency

 Maven downloaded the 0.1 version from http://repo.maven.apache.org/
 maven2/com/databricks/spark-avro_2.10/0.1/ and included it in my app code
 jar.

 From spark-shell:

 import com.databricks.spark.avro._
 import org.apache.spark.sql.SQLContext
 val sqlContext = new SQLContext(sc)

 # This schema includes LONG for time in millis (https://github.com/medale/
 spark-mail/blob/master/mailrecord/src/main/avro/com/
 uebercomputing/mailrecord/MailRecord.avdl)
 val recordsSchema = sqlContext.avroFile(/opt/rpm1/enron/enron-tiny.avro)
 java.lang.RuntimeException: Unsupported type LONG

 However, checking out the spark-avro code from its GitHub repo and adding
 a test case against the MailRecord avro everything ran fine.

 So I built the databricks spark-avro locally on my box and then put it in
 my
 local Maven repo - everything worked from spark-shell when adding that jar
 as dependency.

 Hope this helps for the save case as well. On the pre-14NOV version,
 avro.scala
 says:
  // TODO: Implement me.
   implicit class AvroSchemaRDD(schemaRDD: SchemaRDD) {
 def saveAsAvroFile(path: String): Unit = ???
   }

 Markus

 On 03/12/2015 07:05 PM, kpeng1 wrote:

 Hi All,

 I am current trying to write out a scheme RDD to avro.  I noticed that
 there
 is a databricks spark-avro library and I have included that in my
 dependencies, but it looks like I am not able to access the AvroSaver
 object.  On compilation of the job I get this:
 error: not found: value AvroSaver
 [ERROR] AvroSaver.save(resultRDD, args(4))

 I also tried calling saveAsAvro on the resultRDD(the actual rdd with the
 results) and that passes compilation, but when I run the code I get an
 error
 that says the saveAsAvro is not implemented.  I am using version 0.1 of
 spark-avro_2.10




 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/spark-sql-writing-in-avro-tp22021.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: error on training with logistic regression sgd

2015-03-10 Thread Peng Xia
Hi,

Can anyone give an idea about this?
Just did some google search, it seems related to the 2gb limitation on
block size, https://issues.apache.org/jira/browse/SPARK-1476.
The whole process is that:
1. load the data
2. convert each line of data into labeled points using some feature hashing
algorithm in python.
3. train a logistic regression model with  the converted labeled points.
Can any one give some advice for how to avoid the 2gb, if this is the cause?
Thanks very much for the help.

Best,
Peng

On Mon, Mar 9, 2015 at 3:54 PM, Peng Xia sparkpeng...@gmail.com wrote:

 Hi,

 I was launching a spark cluster with 4 work nodes, each work nodes
 contains 8 cores and 56gb ram, and I was testing my logistic regression
 problem.
 The training set is around 1.2 million records.When I was using 2**10
 (1024) features, the whole program works fine, but when I use 2**14
 features, the program has encountered the error:

 Py4JJavaError: An error occurred while calling 
 o84.trainLogisticRegressionModelWithSGD.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 
 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 
 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): 
 java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
 Integer.MAX_VALUE
   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
   at 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
   at 
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
   at 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
   at 
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
   at 
 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
   at 
 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
   at 
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
   at 
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
   at 
 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
   at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
   at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
   at 
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
   at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
   at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
   at 
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
   at 
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
   at 
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
   at 
 io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
   at 
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
   at 
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
   at 
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
   at java.lang.Thread.run(Thread.java:745)

   at 
 org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156)
   at 
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93

error on training with logistic regression sgd

2015-03-09 Thread Peng Xia
)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



The data are transformed to LabeledPoint and I was using pyspark for this.
Can anyone help me on this?


Thanks,
Best,
Peng


Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Marcelo,

Thanks.  The one in the CDH repo fixed it :)

On Wed, Mar 4, 2015 at 4:37 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Kevin,

 If you're using CDH, I'd recommend using the CDH repo [1], and also
 the CDH version when building your app.

 [1]
 http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html

 On Wed, Mar 4, 2015 at 4:34 PM, Kevin Peng kpe...@gmail.com wrote:
  Ted,
 
  I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not
 too
  sure about the compatibility issues between 1.2.0 and 1.2.1, that is why
 I
  would want to stick to 1.2.0.
 
 
 
  On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu yuzhih...@gmail.com wrote:
 
  Kevin:
  You can try with 1.2.1
 
  See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1
 
  Cheers
 
  On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng kpe...@gmail.com wrote:
 
  Marcelo,
 
  Yes that is correct, I am going through a mirror, but 1.1.0 works
  properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0
 pom
  file.
 
  On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin van...@cloudera.com
  wrote:
 
  Seems like someone set up m2.mines.com as a mirror in your pom file
  or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
  in a messed up state).
 
  On Wed, Mar 4, 2015 at 3:49 PM, kpeng1 kpe...@gmail.com wrote:
   Hi All,
  
   I am currently having problem with the maven dependencies for
 version
   1.2.0
   of spark-core and spark-hive.  Here are my dependencies:
   dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.2.0/version
   /dependency
   dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-hive_2.10/artifactId
 version1.2.0/version
   /dependency
  
   When the dependencies are set to version 1.1.0, I do not get any
   errors.
   Here are the errors I am getting from artifactory for version 1.2.0
 of
   spark-core:
   error=Could not transfer artifact
   org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
   (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
   file\:
  
   https\://m2.mines.com
 \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
   Return code is\: 409 , ReasonPhrase\:Conflict.
  
   The error is the same for spark-hive.
  
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
   Sent from the Apache Spark User List mailing list archive at
   Nabble.com.
  
  
 -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 
 
  --
  Marcelo
 
 
 
 



 --
 Marcelo



Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Ted,

I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not too
sure about the compatibility issues between 1.2.0 and 1.2.1, that is why I
would want to stick to 1.2.0.



On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu yuzhih...@gmail.com wrote:

 Kevin:
 You can try with 1.2.1

 See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1

 Cheers

 On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng kpe...@gmail.com wrote:

 Marcelo,

 Yes that is correct, I am going through a mirror, but 1.1.0 works
 properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0 pom
 file.

 On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Seems like someone set up m2.mines.com as a mirror in your pom file
 or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
 in a messed up state).

 On Wed, Mar 4, 2015 at 3:49 PM, kpeng1 kpe...@gmail.com wrote:
  Hi All,
 
  I am currently having problem with the maven dependencies for version
 1.2.0
  of spark-core and spark-hive.  Here are my dependencies:
  dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.2.0/version
  /dependency
  dependency
groupIdorg.apache.spark/groupId
artifactIdspark-hive_2.10/artifactId
version1.2.0/version
  /dependency
 
  When the dependencies are set to version 1.1.0, I do not get any
 errors.
  Here are the errors I am getting from artifactory for version 1.2.0 of
  spark-core:
  error=Could not transfer artifact
  org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
  (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
 file\:
  https\://m2.mines.com
 \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
  Return code is\: 409 , ReasonPhrase\:Conflict.
 
  The error is the same for spark-hive.
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



 --
 Marcelo






Re: Shuffle write increases in spark 1.2

2015-02-14 Thread Peng Cheng
I double check the 1.2 feature list and found out that the new sort-based
shuffle manager has nothing to do with HashPartitioner :- Sorry for the
misinformation.

In another hand. This may explain increase in shuffle spill as a side effect
of the new shuffle manager, let me revert spark.shuffle.manager to hash and
see if it make things better (or worse, as the benchmark in
https://issues.apache.org/jira/browse/SPARK-3280 indicates)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21657.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle write increases in spark 1.2

2015-02-14 Thread Peng Cheng
Same problem here, shuffle write increased from 10G to over 64G, since I'm
running on amazon EC2 this always cause temporary folder to consume all the
disk space. Still looking for a solution.

BTW, the 64G shuffle write is encountered on shuffling a pairRDD with
HashPartitioner, so its not related to Spark 1.2.0's new features

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-write-increases-in-spark-1-2-tp20894p21656.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why does spark write huge file into temporary local disk even without on-disk persist or checkpoint?

2015-02-11 Thread Peng Cheng
You are right. I've checked the overall stage metrics and looks like the
largest shuffling write is over 9G. The partition completed successfully
but its spilled file can't be removed until all others are finished.
It's very likely caused by a stupid mistake in my design. A lookup table
grows constantly in a loop, every time its union with a new increment will
results in both of them being reshuffled, and partitioner reverted to None.
This can never be efficient with existing API.


Why does spark write huge file into temporary local disk even without on-disk persist or checkpoint?

2015-02-10 Thread Peng Cheng
I'm running a small job on a cluster with 15G of mem and 8G of disk per
machine.

The job always get into a deadlock where the last error message is:

java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:86)
at 
org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:86)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at 
org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
at 
org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
at 
org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

By the time it happens the shuffle write size is 0.0B and input size
is 3.4MB. I wonder what operation could quickly eat up the entire 5G
free disk space.

In addition, The storage level of the entire job is confined to
MEMORY_ONLY_SERIALIZED and checkpointing is completely disabled.


Is LogisticRegressionWithSGD in MLlib scalable?

2015-02-03 Thread Peng Zhang
Hi Everyone,

Is LogisticRegressionWithSGD in MLlib scalable? 

If so, what is the idea behind the scalable implementation?

Thanks in advance,

Peng





-
Peng Zhang
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-LogisticRegressionWithSGD-in-MLlib-scalable-tp21482.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.IllegalStateException: unread block data

2015-02-02 Thread Peng Cheng
I got the same problem, maybe java serializer is unstable



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-tp20668p21463.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-16 Thread Peng Cheng
I'm talking about RDD1 (not persisted or checkpointed) in this situation:

...(somewhere) - RDD1 - RDD2
  ||
 V   V
 RDD3 - RDD4 - Action!

To my experience the change RDD1 get recalculated is volatile, sometimes
once, sometimes twice. When calculation of this RDD is expensive (e.g.
involves using an RESTful service that charges me money), this compels me to
persist RDD1 which takes extra memory, and in case the Action! doesn't
always happen, I don't know when to unpersist it to  free those memory.

A related problem might be in $SQLContest.jsonRDD(), since the source
jsonRDD is used twice (one for schema inferring, another for data read). It
almost guarantees that the source jsonRDD is calculated twice. Has this
problem be addressed so far?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/If-an-RDD-appeared-twice-in-a-DAG-of-which-calculation-is-triggered-by-a-single-action-will-this-RDD-tp21192.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-16 Thread Peng Cheng
I'm talking about RDD1 (not persisted or checkpointed) in this situation:

...(somewhere) - RDD1 - RDD2
  ||
 V   V
 RDD3 - RDD4 - Action!

To my experience the change RDD1 get recalculated is volatile, sometimes
once, sometimes twice. When calculation of this RDD is expensive (e.g.
involves using an RESTful service that charges me money), this compels me
to persist RDD1 which takes extra memory, and in case the Action! doesn't
always happen, I don't know when to unpersist it to  free those memory.

A related problem might be in $SQLContest.jsonRDD(), since the source
jsonRDD is used twice (one for schema inferring, another for data read). It
almost guarantees that the source jsonRDD is calculated twice. Is there a
way to solve (or circumvent) this problem?


Re: DeepLearning and Spark ?

2015-01-09 Thread Peng Cheng
Not if broadcast can only be used between stages. To enable this you have
to at least make broadcast asynchronous  non-blocking.

On 9 January 2015 at 18:02, Krishna Sankar ksanka...@gmail.com wrote:

 I am also looking at this domain. We could potentially use the broadcast
 capability in Spark to distribute the parameters. Haven't thought thru yet.
 Cheers
 k/

 On Fri, Jan 9, 2015 at 2:56 PM, Andrei faithlessfri...@gmail.com wrote:

 Does it makes sense to use Spark's actor system (e.g. via
 SparkContext.env.actorSystem) to create parameter server?

 On Fri, Jan 9, 2015 at 10:09 PM, Peng Cheng rhw...@gmail.com wrote:

 You are not the first :) probably not the fifth to have the question.
 parameter server is not included in spark framework and I've seen all
 kinds of hacking to improvise it: REST api, HDFS, tachyon, etc.
 Not sure if an 'official' benchmark  implementation will be released
 soon

 On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote:

 Pretty vague on details:


 http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199


 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Hi all,

 DeepLearning algorithms are popular and achieve many state of the art
 performance in several real world machine learning problems. Currently
 there are no DL implementation in spark and I wonder if there is an ongoing
 work on this topics.

 We can do DL in spark Sparkling water and H2O but this adds an
 additional software stack.

 Deeplearning4j seems to implements a distributed version of many
 popural DL algorithm. Porting DL4j in Spark can be interesting.

 Google describes an implementation of a large scale DL in this paper
 http://research.google.com/archive/large_deep_networks_nips2012.html.
 Based on model parallelism and data parallelism.

 So, I'm trying to imaging what should be a good design for DL algorithm
 in Spark ? Spark already have RDD (for data parallelism). Can GraphX be
 used for the model parallelism (as DNN are generally designed as DAG) ? And
 what about using GPUs to do local parallelism (mecanism to push partition
 into GPU memory ) ?


 What do you think about this ?


 Cheers,

 Jao







Re: DeepLearning and Spark ?

2015-01-09 Thread Peng Cheng
You are not the first :) probably not the fifth to have the question.
parameter server is not included in spark framework and I've seen all kinds
of hacking to improvise it: REST api, HDFS, tachyon, etc.
Not sure if an 'official' benchmark  implementation will be released soon

On 9 January 2015 at 10:59, Marco Shaw marco.s...@gmail.com wrote:

 Pretty vague on details:

 http://www.datasciencecentral.com/m/blogpost?id=6448529%3ABlogPost%3A227199


 On Jan 9, 2015, at 11:39 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:

 Hi all,

 DeepLearning algorithms are popular and achieve many state of the art
 performance in several real world machine learning problems. Currently
 there are no DL implementation in spark and I wonder if there is an ongoing
 work on this topics.

 We can do DL in spark Sparkling water and H2O but this adds an additional
 software stack.

 Deeplearning4j seems to implements a distributed version of many popural
 DL algorithm. Porting DL4j in Spark can be interesting.

 Google describes an implementation of a large scale DL in this paper
 http://research.google.com/archive/large_deep_networks_nips2012.html.
 Based on model parallelism and data parallelism.

 So, I'm trying to imaging what should be a good design for DL algorithm in
 Spark ? Spark already have RDD (for data parallelism). Can GraphX be used
 for the model parallelism (as DNN are generally designed as DAG) ? And what
 about using GPUs to do local parallelism (mecanism to push partition into
 GPU memory ) ?


 What do you think about this ?


 Cheers,

 Jao




Re: Is it possible to do incremental training using ALSModel (MLlib)?

2015-01-02 Thread Peng Cheng
I was under the impression that ALS wasn't designed for it :- The famous
ebay online recommender uses SGD
However, you can try using the previous model as starting point, and
gradually reduce the number of iteration after the model stablize. I never
verify this idea, so you need to at least cross-validate it before putting
into productio

On 2 January 2015 at 04:40, Wouter Samaey wouter.sam...@storefront.be
wrote:

 Hi all,

 I'm curious about MLlib and if it is possible to do incremental training on
 the ALSModel.

 Usually training is run first, and then you can query. But in my case, data
 is collected in real-time and I want the predictions of my ALSModel to
 consider the latest data without complete re-training phase.

 I've checked out these resources, but could not find any info on how to
 solve this:
 https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html

 http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html

 My question fits in a larger picture where I'm using Prediction IO, and
 this
 in turn is based on Spark.

 Thanks in advance for any advice!

 Wouter



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark-repl_1.2.0 was not uploaded to central maven repository.

2014-12-22 Thread peng

Thanks a lot for point it out. I also found it in pom.xml.
A new ticket for reverting it has been submitted: 
https://issues.apache.org/jira/browse/SPARK-4923
At first I assume that further development on it has been moved to 
databricks cloud. But the JIRA ticket was already there in September. So 
maybe demand on this API from the community is indeed low enough.
However, I would still suggest keeping it, even promoting it into a 
Developer's API, this would encourage more projects to integrate in a 
more flexible way, and save prototyping/QA cost by customizing fixtures 
of REPL. People will still move to databricks cloud, which has far more 
features than that. Many influential projects already depends on the 
routinely published Scala-REPL (e.g. playFW), it would be strange for 
Spark not doing the same.

What do you think?

Yours Peng

On 12/22/2014 04:57 PM, Sean Owen wrote:

Just closing the loop -- FWIW this was indeed on purpose --
https://issues.apache.org/jira/browse/SPARK-3452 . I take it that it's
not encouraged to depend on the REPL as a module.

On Sun, Dec 21, 2014 at 10:34 AM, Sean Owen so...@cloudera.com wrote:

I'm only speculating, but I wonder if it was on purpose? would people
ever build an app against the REPL?

On Sun, Dec 21, 2014 at 5:50 AM, Peng Cheng pc...@uow.edu.au wrote:

Everything else is there except spark-repl. Can someone check that out this
weekend?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-repl-1-2-0-was-not-uploaded-to-central-maven-repository-tp20799.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Announcing Spark Packages

2014-12-22 Thread peng

Me 2 :)

On 12/22/2014 06:14 PM, Andrew Ash wrote:

Hi Xiangrui,

That link is currently returning a 503 Over Quota error message.  
Would you mind pinging back out when the page is back up?


Thanks!
Andrew

On Mon, Dec 22, 2014 at 12:37 PM, Xiangrui Meng men...@gmail.com 
mailto:men...@gmail.com wrote:


Dear Spark users and developers,

I’m happy to announce Spark Packages (http://spark-packages.org), a
community package index to track the growing number of open source
packages and libraries that work with Apache Spark. Spark Packages
makes it easy for users to find, discuss, rate, and install packages
for any version of Spark, and makes it easy for developers to
contribute packages.

Spark Packages will feature integrations with various data sources,
management tools, higher level domain-specific libraries, machine
learning algorithms, code samples, and other Spark content. Thanks to
the package authors, the initial listing of packages includes
scientific computing libraries, a job execution server, a connector
for importing Avro data, tools for launching Spark on Google Compute
Engine, and many others.

I’d like to invite you to contribute and use Spark Packages and
provide feedback! As a disclaimer: Spark Packages is a community index
maintained by Databricks and (by design) will include packages outside
of the ASF Spark project. We are excited to help showcase and support
all of the great work going on in the broader Spark community!

Cheers,
Xiangrui

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org






spark-repl_1.2.0 was not uploaded to central maven repository.

2014-12-20 Thread Peng Cheng
Everything else is there except spark-repl. Can someone check that out this
weekend?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-repl-1-2-0-was-not-uploaded-to-central-maven-repository-tp20799.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark on Tachyon

2014-12-20 Thread Peng Cheng
IMHO: cache doesn't provide redundancy, and its in the same jvm, so its much
faster.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Tachyon-tp1463p20800.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to extend an one-to-one RDD of Spark that can be persisted?

2014-12-04 Thread Peng Cheng
In my project I extend a new RDD type that wraps another RDD and some
metadata. The code I use is similar to FilteredRDD implementation:

case class PageRowRDD(
   self: RDD[PageRow],
   @transient keys: ListSet[KeyLike] = ListSet()
){
  override def getPartitions: Array[Partition] =
firstParent[PageRow].partitions

  override val partitioner = self.partitioner

  override def compute(split: Partition, context: TaskContext) =
firstParent[PageRow].iterator(split, context)
}
However when I try to persist and reuse it in 2 transformations. My logs and
debug shows that it is being computed twice, rather than being reused in
memory.

The problem is: there is no such problem for FilteredRDD. How do I avoid
this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-extend-an-one-to-one-RDD-of-Spark-that-can-be-persisted-tp20394.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to make sure a ClassPath is always shipped to workers?

2014-11-03 Thread Peng Cheng
I have a spark application that deserialize an object 'Seq[Page]', save to
HDFS/S3, and read by another worker to be used elsewhere. The serialization
and deserialization use the same serializer as Spark itself. (Read from
SparkEnv.get.serializer.newInstance())

However I sporadically get this error:

java.lang.ClassNotFoundException: org.***.***.Page
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

It seems like Page class wasn't shipped with the Jar and executor and all
its information was erased in runtime.

The most weird thing: this error doesn't always happen, sometimes the old
Seq[Page] was get properly, sometimes it throws the exception, how could
this happen and how do I fix it?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18018.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to make sure a ClassPath is always shipped to workers?

2014-11-03 Thread Peng Cheng
I have a spark application that deserialize an object 'Seq[Page]', save to
HDFS/S3, and read by another worker to be used elsewhere. The serialization
and deserialization use the same serializer as Spark itself. (Read from
SparkEnv.get.serializer.newInstance())

However I sporadically get this error:

java.lang.ClassNotFoundException: org.***.***.Page
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

It seems like Page class wasn't shipped with the Jar and executor and all
its information was erased in runtime.

The most weird thing: this error doesn't always happen, sometimes the old
Seq[Page] was get properly, sometimes it throws the exception, how could
this happen and how do I fix it?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to make sure a ClassPath is always shipped to workers?

2014-11-03 Thread Peng Cheng
Sorry its a timeout duplicate, please remove it



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18018p18020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Hi,



Previous we have applied SVM algorithm in MLlib to 5 million records (600
mb), it takes more than 25 minutes to finish.
The spark version we are using is 1.0 and we were running this program on a
4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.

The 5 million records only have two distinct records (One positive and one
negative), others are all duplications.

Any one has any idea on why it takes so long on this small data?



Thanks,
Best,

Peng


Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Thanks for all your help.
I think I didn't cache the data. My previous cluster was expired and I
don't have a chance to check the load balance or app manager.
Below is my code.
There are 18 features for each record and I am using the Scala API.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import java.util.Calendar

object BenchmarkClassification {
def main(args: Array[String]) {
// Load and parse the data file
val conf = new SparkConf()
  .setAppName(SVM)
  .set(spark.executor.memory, 8g)
  // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
val sc = new SparkContext(conf)
val data = sc.textFile(args(0))
val parsedData = data.map { line =
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
x.toDouble)))
}
val testData = sc.textFile(args(1))
val testParsedData = testData .map { line =
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
x.toDouble)))
}

// Run training algorithm to build the model
val numIterations = 20
val model = SVMWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
// val labelAndPreds = testParsedData.map { point =
//   val prediction = model.predict(point.features)
//   (point.label, prediction)
// }
// val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
testParsedData.count
// println(Training Error =  + trainErr)
println(Calendar.getInstance().getTime())
}
}




Thanks,
Best,
Peng

On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote:

 DId you cache the data and check the load balancing? How many
 features? Which API are you using, Scala, Java, or Python? -Xiangrui

 On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
  Watch the app manager it should tell you what's running and taking
 awhile...
  My guess it's a distinct function on the data.
  J
 
  Sent from my iPhone
 
  On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
 
  Hi,
 
 
 
  Previous we have applied SVM algorithm in MLlib to 5 million records (600
  mb), it takes more than 25 minutes to finish.
  The spark version we are using is 1.0 and we were running this program
 on a
  4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
 
  The 5 million records only have two distinct records (One positive and
 one
  negative), others are all duplications.
 
  Any one has any idea on why it takes so long on this small data?
 
 
 
  Thanks,
  Best,
 
  Peng



Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Hi Xiangrui,

Can you give me some code example about caching, as I am new to Spark.

Thanks,
Best,
Peng

On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote:

 Then caching should solve the problem. Otherwise, it is just loading
 and parsing data from disk for each iteration. -Xiangrui

 On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote:
  Thanks for all your help.
  I think I didn't cache the data. My previous cluster was expired and I
 don't
  have a chance to check the load balance or app manager.
  Below is my code.
  There are 18 features for each record and I am using the Scala API.
 
  import org.apache.spark.SparkConf
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext._
  import org.apache.spark.rdd._
  import org.apache.spark.mllib.classification.SVMWithSGD
  import org.apache.spark.mllib.regression.LabeledPoint
  import org.apache.spark.mllib.linalg.Vectors
  import java.util.Calendar
 
  object BenchmarkClassification {
  def main(args: Array[String]) {
  // Load and parse the data file
  val conf = new SparkConf()
   .setAppName(SVM)
   .set(spark.executor.memory, 8g)
   // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
 val sc = new SparkContext(conf)
  val data = sc.textFile(args(0))
  val parsedData = data.map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
  val testData = sc.textFile(args(1))
  val testParsedData = testData .map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
 
  // Run training algorithm to build the model
  val numIterations = 20
  val model = SVMWithSGD.train(parsedData, numIterations)
 
  // Evaluate model on training examples and compute training error
  // val labelAndPreds = testParsedData.map { point =
  //   val prediction = model.predict(point.features)
  //   (point.label, prediction)
  // }
  // val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble
 /
  testParsedData.count
  // println(Training Error =  + trainErr)
  println(Calendar.getInstance().getTime())
  }
  }
 
 
 
 
  Thanks,
  Best,
  Peng
 
  On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com wrote:
 
  DId you cache the data and check the load balancing? How many
  features? Which API are you using, Scala, Java, or Python? -Xiangrui
 
  On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
   Watch the app manager it should tell you what's running and taking
   awhile...
   My guess it's a distinct function on the data.
   J
  
   Sent from my iPhone
  
   On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
  
   Hi,
  
  
  
   Previous we have applied SVM algorithm in MLlib to 5 million records
   (600
   mb), it takes more than 25 minutes to finish.
   The spark version we are using is 1.0 and we were running this program
   on a
   4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
  
   The 5 million records only have two distinct records (One positive and
   one
   negative), others are all duplications.
  
   Any one has any idea on why it takes so long on this small data?
  
  
  
   Thanks,
   Best,
  
   Peng
 
 



Re: issue on applying SVM to 5 million examples.

2014-10-30 Thread peng xia
Thanks Jimmy.
I will have a try.
Thanks very much for your guys' help.

Best,
Peng

On Thu, Oct 30, 2014 at 8:19 PM, Jimmy ji...@sellpoints.com wrote:

 sampleRDD. cache()

 Sent from my iPhone

 On Oct 30, 2014, at 5:01 PM, peng xia toxiap...@gmail.com wrote:

 Hi Xiangrui,

 Can you give me some code example about caching, as I am new to Spark.

 Thanks,
 Best,
 Peng

 On Thu, Oct 30, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote:

 Then caching should solve the problem. Otherwise, it is just loading
 and parsing data from disk for each iteration. -Xiangrui

 On Thu, Oct 30, 2014 at 11:44 AM, peng xia toxiap...@gmail.com wrote:
  Thanks for all your help.
  I think I didn't cache the data. My previous cluster was expired and I
 don't
  have a chance to check the load balance or app manager.
  Below is my code.
  There are 18 features for each record and I am using the Scala API.
 
  import org.apache.spark.SparkConf
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkContext._
  import org.apache.spark.rdd._
  import org.apache.spark.mllib.classification.SVMWithSGD
  import org.apache.spark.mllib.regression.LabeledPoint
  import org.apache.spark.mllib.linalg.Vectors
  import java.util.Calendar
 
  object BenchmarkClassification {
  def main(args: Array[String]) {
  // Load and parse the data file
  val conf = new SparkConf()
   .setAppName(SVM)
   .set(spark.executor.memory, 8g)
   // .set(spark.executor.extraJavaOptions, -Xms8g -Xmx8g)
 val sc = new SparkContext(conf)
  val data = sc.textFile(args(0))
  val parsedData = data.map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
  val testData = sc.textFile(args(1))
  val testParsedData = testData .map { line =
   val parts = line.split(',')
   LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x =
  x.toDouble)))
  }
 
  // Run training algorithm to build the model
  val numIterations = 20
  val model = SVMWithSGD.train(parsedData, numIterations)
 
  // Evaluate model on training examples and compute training error
  // val labelAndPreds = testParsedData.map { point =
  //   val prediction = model.predict(point.features)
  //   (point.label, prediction)
  // }
  // val trainErr = labelAndPreds.filter(r = r._1 !=
 r._2).count.toDouble /
  testParsedData.count
  // println(Training Error =  + trainErr)
  println(Calendar.getInstance().getTime())
  }
  }
 
 
 
 
  Thanks,
  Best,
  Peng
 
  On Thu, Oct 30, 2014 at 1:23 PM, Xiangrui Meng men...@gmail.com
 wrote:
 
  DId you cache the data and check the load balancing? How many
  features? Which API are you using, Scala, Java, or Python? -Xiangrui
 
  On Thu, Oct 30, 2014 at 9:13 AM, Jimmy ji...@sellpoints.com wrote:
   Watch the app manager it should tell you what's running and taking
   awhile...
   My guess it's a distinct function on the data.
   J
  
   Sent from my iPhone
  
   On Oct 30, 2014, at 8:22 AM, peng xia toxiap...@gmail.com wrote:
  
   Hi,
  
  
  
   Previous we have applied SVM algorithm in MLlib to 5 million records
   (600
   mb), it takes more than 25 minutes to finish.
   The spark version we are using is 1.0 and we were running this
 program
   on a
   4 nodes cluster. Each node has 4 cpu cores and 11 GB RAM.
  
   The 5 million records only have two distinct records (One positive
 and
   one
   negative), others are all duplications.
  
   Any one has any idea on why it takes so long on this small data?
  
  
  
   Thanks,
   Best,
  
   Peng
 
 





Re: Asynchronous Broadcast from driver to workers, is it possible?

2014-10-21 Thread Peng Cheng
Looks like the only way is to implement that feature. There is no way of
hacking it into working



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-tp15758p16985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Asynchronous Broadcast from driver to workers, is it possible?

2014-10-06 Thread Peng Cheng
Any suggestions? I'm thinking of submitting a feature request for mutable
broadcast. Is it doable?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-tp15758p15807.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Asynchronous Broadcast from driver to workers, is it possible?

2014-10-04 Thread Peng Cheng
While Spark already offers support for asynchronous reduce (collect data from
workers, while not interrupting execution of a parallel transformation)
through accumulator, I have made little progress on making this process
reciprocal, namely, to broadcast data from driver to workers to be used by
all executors in the middle of a transformation. This primarily intended to
be used in downpour SGD/adagrad, a non-blocking concurrent machine learning
optimizer that performs better than existing synchronous GD in MLlib, and
have vast application in training of many models.

My attempt so far is to stick to out-of-the-box, immutable broadcast, open a
new thread on driver, in which I broadcast a thin data wrapper that when
deserialized, will insert into a mutable singleton that is already
replicated to all workers in the fat jar, this customized deserialization is
not hard, just overwrite readObject like this:

class AutoInsert(var value: Int) extends Serializable{

  WorkerReplica.last = value

  private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject()
WorkerContainer.last = this.value
  }
}

Unfortunately it looks like the deserializtion is called lazily and won't do
anything before a worker use it (Broadcast[AutoInsert]), this is impossible
without waiting for workers' stage to be finished and broadcast again. I'm
wondering if I can 'hack' this thing into working? Or I'll have to write a
serious extension to broadcast component to enable changing the value.

Hope I can find like-minded on this forum because ML is a selling point of
Spark.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-tp15758.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[no subject]

2014-09-30 Thread PENG ZANG
Hi,

We have a cluster setup with spark 1.0.2 running 4 workers and 1 master
with 64G RAM for each. In the sparkContext we specify 32G executor memory.
However, as long as the task running longer than approximate 15 mins, all
the executors are lost just like some sort of timeout no matter if the task
is using up the memory. We tried to increase the spark.akka.timeout,
spark.akka.lookupTimeout, and spark.worker.timeout, but still no luck.
Besides, even we just start a sparkContext and sit there instead of stop
it, it will still error out with the exception below:

[error] o.a.s.s.TaskSchedulerImpl - Lost executor 0 on XXX06: remote Akka
client disassociated
[error] o.a.s.n.ConnectionManager - Corresponding SendingConnection to
ConnectionManagerId(XXX06.local,34307) not found
[error] o.a.s.s.TaskSchedulerImpl - Lost executor 2 on XXX08: remote Akka
client disassociated
[error] o.a.s.s.TaskSchedulerImpl - Lost executor 1 on XXX07: remote Akka
client disassociated
[error] o.a.s.n.SendingConnection - Exception while reading
SendingConnection to ConnectionManagerId(XXX1,56639)
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
~[na:1.7.0_60]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
~[na:1.7.0_60]
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
~[spark-core_2.10-1.1.0.jar:1.1.0]
at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
[spark-core_2.10-1.1.0.jar:1.1.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_60]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_60]
[error] o.a.s.n.SendingConnection - Exception while reading
SendingConnection to ConnectionManagerId(XXX08.local,39914)
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
~[na:1.7.0_60]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
~[na:1.7.0_60]
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
~[spark-core_2.10-1.1.0.jar:1.1.0]
at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
[spark-core_2.10-1.1.0.jar:1.1.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_60]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_60]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] -
[akka.tcp://sparkExecutor@XXX06:60653]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX06:60653]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@XXX06:60653]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: XXX06/10.40.31.51:60653
]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] -
[akka.tcp://sparkExecutor@XXX06:61000]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX06:61000]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@XXX06:61000]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: XXX06/10.40.31.51:61000
]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] -
[akka.tcp://sparkExecutor@XXX08:52949]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX08:52949]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@XXX08:52949]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: XXX08/10.40.31.53:52949
]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] -
[akka.tcp://sparkExecutor@XXX08:36726]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX08:36726]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@XXX08:36726]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: XXX08/10.40.31.53:36726
]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] -
[akka.tcp://sparkExecutor@XXX07:46516]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX07:46516]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@XXX07:46516]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: XXX07/10.40.31.52:46516
]
[error] a.r.EndpointWriter - AssociationError
[akka.tcp://sparkDriver@losbornev.local:35540] -
[akka.tcp://sparkExecutor@XXX07:48160]: Error [Association failed with
[akka.tcp://sparkExecutor@XXX07:48160]] [
akka.remote.EndpointAssociationException: Association failed with

Re: Invalid signature file digest for Manifest main attributes with spark job built using maven

2014-09-16 Thread Kevin Peng
Sean,

Thanks.  That worked.

Kevin

On Mon, Sep 15, 2014 at 3:37 PM, Sean Owen so...@cloudera.com wrote:

 This is more of a Java / Maven issue than Spark per se. I would use
 the shade plugin to remove signature files in your final META-INF/
 dir. As Spark does, in its configuration:

 filters
   filter
 artifact*:*/artifact
 excludes
   excludeorg/datanucleus/**/exclude
   excludeMETA-INF/*.SF/exclude
   excludeMETA-INF/*.DSA/exclude
   excludeMETA-INF/*.RSA/exclude
 /excludes
   /filter
 /filters

 On Mon, Sep 15, 2014 at 11:33 PM, kpeng1 kpe...@gmail.com wrote:
  Hi All,
 
  I am trying to submit a spark job that I have built in maven using the
  following command:
  /usr/bin/spark-submit --deploy-mode client --class com.spark.TheMain
  --master local[1] /home/cloudera/myjar.jar 100
 
  But I seem to be getting the following error:
  Exception in thread main java.lang.SecurityException: Invalid signature
  file digest for Manifest main attributes
  at
 
 sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:286)
  at
 
 sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:239)
  at java.util.jar.JarVerifier.processEntry(JarVerifier.java:307)
  at java.util.jar.JarVerifier.update(JarVerifier.java:218)
  at java.util.jar.JarFile.initializeVerifier(JarFile.java:345)
  at java.util.jar.JarFile.getInputStream(JarFile.java:412)
  at
 sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:775)
  at sun.misc.Resource.cachedInputStream(Resource.java:77)
  at sun.misc.Resource.getByteBuffer(Resource.java:160)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:436)
  at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:270)
  at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:289)
  at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 
 
  Here is the pom file I am using to build the jar:
  project xmlns=http://maven.apache.org/POM/4.0.0;
  xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
  xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
  http://maven.apache.org/maven-v4_0_0.xsd;
modelVersion4.0.0/modelVersion
groupIdcom.spark/groupId
artifactIdmyjar/artifactId
version0.0.1-SNAPSHOT/version
name${project.artifactId}/name
descriptionMy wonderfull scala app/description
inceptionYear2010/inceptionYear
licenses
  license
nameMy License/name
urlhttp:///url
distributionrepo/distribution
  /license
/licenses
 
properties
  cdh.versioncdh5.1.0/cdh.version
  maven.compiler.source1.6/maven.compiler.source
  maven.compiler.target1.6/maven.compiler.target
  encodingUTF-8/encoding
  scala.tools.version2.10/scala.tools.version
  scala.version2.10.4/scala.version
/properties
 
repositories
  repository
idscala-tools.org/id
nameScala-tools Maven2 Repository/name
urlhttps://oss.sonatype.org/content/repositories/snapshots/
 /url
  /repository
  repository
idmaven-hadoop/id
nameHadoop Releases/name
 
  urlhttps://repository.cloudera.com/content/repositories/releases/
 /url
  /repository
  repository
idcloudera-repos/id
nameCloudera Repos/name
urlhttps://repository.cloudera.com/artifactory/cloudera-repos/
 /url
  /repository
/repositories
pluginRepositories
  pluginRepository
idscala-tools.org/id
nameScala-tools Maven2 Repository/name
urlhttps://oss.sonatype.org/content/repositories/snapshots/
 /url
  /pluginRepository
/pluginRepositories
 
dependencies
  dependency
groupIdorg.scala-lang/groupId
artifactIdscala-library/artifactId
version${scala.version}/version
  /dependency
  dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.0.0-${cdh.version}/version
  /dependency
  dependency
groupIdorg.apache.spark/groupId
artifactIdspark-tools_2.10/artifactId
version1.0.0-${cdh.version}/version
  /dependency
  dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming-flume_2.10/artifactId

Re: Crawler and Scraper with different priorities

2014-09-09 Thread Peng Cheng
Hi Sandeep,

would you be interesting in joining my open source project?

https://github.com/tribbloid/spookystuff

IMHO spark is indeed not for general purpose crawling, of which distributed
job is highly homogeneous. But good enough for directional scraping which
involves heterogeneous input and deep graph following  extraction. Please
drop me a line if you have a user case, as I'll try to integrate it as a
feature.

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Crawler-Scraper-with-different-priorities-tp13645p13838.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming into HBase

2014-09-03 Thread Kevin Peng
Ted,

Here is the full stack trace coming from spark-shell:

14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job streaming
job 1409786463000 ms.0

org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)

at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

at akka.actor.ActorCell.invoke(ActorCell.scala:456)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

at akka.dispatch.Mailbox.run(Mailbox.scala:219)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Basically, what I am doing on the terminal where I run nc -lk, I type in
words separated by commas and hit enter i.e. bill,ted.


On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu yuzhih...@gmail.com wrote:

 Adding back user@

 I am not familiar with the NotSerializableException. Can you show the
 full stack trace ?

 See SPARK-1297 for changes you need to make so that Spark works with
 hbase 0.98

 Cheers


 On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng kpe...@gmail.com wrote:

 Ted,

 The hbase-site.xml is in the classpath (had worse issues before... until
 I figured that it wasn't in the path).

 I get the following error in the spark-shell:
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.streaming.StreamingContext
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
 ...

 I also double checked the hbase table, just in case, and nothing new is
 written in there.

 I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
 CDH5.1.0 distro.

 Thank you for the help.


 On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu yuzhih...@gmail.com wrote:

 Is hbase-site.xml in the classpath ?
 Do you observe any exception from the code below or in region server log
 ?

 Which hbase release are you using ?


 On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 kpe...@gmail.com wrote:

 I have been trying to understand how spark streaming and hbase connect,
 but
 have not been successful. What I am trying to do is given a spark
 stream,
 process that stream and store the results in an hbase table. So far
 this is
 what I have:

 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.storage.StorageLevel
 import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
 import org.apache.hadoop.hbase.util.Bytes

 def blah(row: Array[String]) {
   val hConf = new HBaseConfiguration()
   val hTable = new HTable(hConf, table)
   val thePut = new Put(Bytes.toBytes(row(0)))
   thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)),
 Bytes.toBytes(row(0)))
   hTable.put(thePut)
 }

 val ssc = new StreamingContext(sc, Seconds(1))
 val lines = ssc.socketTextStream(localhost, ,
 StorageLevel.MEMORY_AND_DISK_SER)
 val words = lines.map(_.split(,))
 val store = words.foreachRDD(rdd = rdd.foreach(blah))
 ssc.start()

 I am currently running the above code in spark-shell. I am not sure
 what I
 am doing wrong.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

Re: Bug or feature? Overwrite broadcasted variables.

2014-08-19 Thread Peng Cheng
Unfortunately, After some research I found its just a side effect of how
closure containing var works in scala:
http://stackoverflow.com/questions/11657676/how-does-scala-maintains-the-values-of-variable-when-the-closure-was-defined

the closure keep referring var broadcasted wrapper as a pointer, until it is
shipped to nodes, which is only triggered lazily. So, you can't do this
after shipping already started (e.g. change the broadcasted value in a new
thread when an action is running). It's neither a feature or bug, just an
illusion.

I would really like to see a non-blocking Broadcast.set() being implemented,
it makes a lot of stochastic algorithms easier to write.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315p12382.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Bug or feature? Overwrite broadcasted variables.

2014-08-18 Thread Peng Cheng
I'm curious to see that if you declare broadcasted wrapper as a var, and
overwrite it in the driver program, the modification can have stable impact
on all transformations/actions defined BEFORE the overwrite but was executed
lazily AFTER the overwrite:

   val a = sc.parallelize(1 to 10)

var broadcasted = sc.broadcast(broad)

val b = a.map(_ + broadcasted.value)
//  b.persist()
for (line - b.collect()) {  print(line)  }

println(\n===)
broadcasted = sc.broadcast(cast)

for (line - b.collect()) {  print(line)  }

the result is:

1broad2broad3broad4broad5broad6broad7broad8broad9broad10broad
===
1cast2cast3cast4cast5cast6cast7cast8cast9cast10cast

Of course, if you persist b before overwriting it will still get the
non-surprising result (both are 10broad... because they are persisted). This
can be useful sometimes but may cause confusion at other times (people can
no longer add persist at will just for backup because it may change the
result).

So far I've found no documentation supporting this feature. So can some one
confirm that its a feature craftly designed?

Yours Peng 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Bug or feature? Overwrite broadcasted variables.

2014-08-18 Thread Peng Cheng
Yeah, Thanks a lot. I know for people understanding lazy execution this seems
straightforward. But for those who don't it may become a liability.

I've only tested its stability on a small example (which seems stable),
hopefully it's not a serendipity. Can a committer confirm this?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bug-or-feature-Overwrite-broadcasted-variables-tp12315p12348.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-06-27 Thread Peng Cheng
I give up, communication must be blocked by the complex EC2 network topology
(though the error information indeed need some improvement). It doesn't make
sense to run a client thousands miles away to communicate frequently with
workers. I have moved everything to EC2 now.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8444.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Integrate spark-shell into officially supported web ui/api plug-in? What do you think?

2014-06-27 Thread Peng Cheng
This will be handy for demo and quick prototyping as the command-line REPL
doesn't support a lot of editor features, also, you don't need to ssh into
your worker/master if your client is behind an NAT wall. Since Spark
codebase has a minimalistic design philosophy I don't think this component
can make into the main repository. However it can be an independent project
that is also supported by the community (like Solr/ElasticSearch to Lucene)

I've reviewed and tested a few REPL web ui including:
- Scala-notebook: https://github.com/Bridgewater/scala-notebook
- Tinsmiths: https://github.com/kouphax/tinsmith
- IScala: https://github.com/mattpap/IScala
- Codebrew: https://codebrew.io/

however they are either too heavyweight, or their ILoop is buried very deep
(sometimes even in another library). I'm interested in working on this part,
has anyone experimented on similar solution before?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integrate-spark-shell-into-officially-supported-web-ui-api-plug-in-What-do-you-think-tp8447.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Integrate spark-shell into officially supported web ui/api plug-in? What do you think?

2014-06-27 Thread Peng Cheng
That would be really cool with IPython, But I' still wondering if all
language features are supported, namely I need these 2 in particular:
1. importing class and ILoop from external jars (so I can point it to
SparkILoop or Sparkbinding ILoop of Apache Mahout instead of Scala's default
ILoop)
2. implicit typecast/wrapper and implicit variable (widely used in
SparkContext.scala)
I'll be able to start experimentation immediately if someone can confirm
these features.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integrate-spark-shell-into-officially-supported-web-ui-api-plug-in-What-do-you-think-tp8447p8469.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark slave fail to start with wierd error information

2014-06-25 Thread Peng Cheng
Sorry I just realize that start-slave is for a different task. Please close
this



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-slave-fail-to-start-with-wierd-error-information-tp8203p8246.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-06-25 Thread Peng Cheng
I'm running a very small job (16 partitions, 2 stages) on a 2-node cluster,
each with 15G memory, the master page looks all normal:

URL: spark://ec2-54-88-40-125.compute-1.amazonaws.com:7077
Workers: 1
Cores: 2 Total, 2 Used
Memory: 13.9 GB Total, 512.0 MB Used
Applications: 1 Running, 0 Completed
Drivers: 0 Running, 1 Completed
Status: ALIVE
Workers

Id  Address State   Cores   Memory
worker-20140625083124-ip-172-31-35-57.ec2.internal-54548
ip-172-31-35-57.ec2.internal:54548  ALIVE   2 (2 Used)   13.9 GB (512.0 
MB Used)
Running Applications

ID  NameCores   Memory per Node Submitted Time  UserState   Duration
app-20140625083158- org.tribbloid.spookystuff.example.GoogleImage$   2  
512.0 MB2014/06/25 08:31:58 pengRUNNING 17 min

However when submitting the job in client mode:

$SPARK_HOME/bin/spark-submit \
--class org.tribbloid.spookystuff.example.GoogleImage \
--master spark://ec2-54-88-40-125.compute-1.amazonaws.com:7077 \
--deploy-mode client \
./../../../target/spookystuff-example-assembly-0.1.0-SNAPSHOT.jar \

it is never picked up by any worker despite that 13.4G memory and 2 cores in
total are available. The log of driver shows repeatedly:

14/06/25 04:46:29 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory

Looks like its either a bug or misinformation. Can someone confirm this so I
can submit a JIRA?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


  1   2   >