[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-09-05 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-54686436
  
@freedafeng This PR was actually merged and will be available in Spark 1.1 
(which should be released _very_ soon).  
https://github.com/apache/spark/commit/f971d6cb60d642178d6544217a25fa16ece34889


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-09-05 Thread freedafeng
Github user freedafeng commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-54686288
  
Anyone still working on this patch? Pyspark + Hbase is the key to our data 
science application. I really hope it can work in the very near future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-08-05 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-51220911
  
It will be in release 1.1.


You should be able to check out branch-1.1 and build from source and it 
should work ok.




Otherwise 1.1 should be released within a couple weeks 
—
Sent from Mailbox

On Tue, Aug 5, 2014 at 4:15 PM, Russell Jurney 
wrote:

> @ericgarcia @srowen @MLnick 
> Unfortunately when I follow those directions, I still get errors. It 
looks like I'll have to wait to get this functionality until its merged. Anyone 
have any idea when that will be? Is there a release in mind that this would be 
a part of?
> ---
> Reply to this email directly or view it on GitHub:
> https://github.com/apache/spark/pull/455#issuecomment-51202880


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-08-05 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-51202880
  
@ericgarcia @srowen @MLnick 

Unfortunately when I follow those directions, I still get errors. It looks 
like I'll have to wait to get this functionality until its merged. Anyone have 
any idea when that will be? Is there a release in mind that this would be a 
part of?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-29 Thread ericgarcia
Github user ericgarcia commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50541143
  
@rjurney For some reason I can't push my changes to a tagged release in 
github. Here's the packaged distribution that I built: 
https://dl.dropboxusercontent.com/u/145006/spark-1.0.0-bin-1.0.4.tgz it's built 
for hadoop 1.0

If you want to create your own modified 1.0.0 release, what i did was: 
check out the tagged 1.0.0 release https://github.com/apache/spark/tree/v1.0.0 
and also check out this pull request and then copy the files from this pull 
request into the corresponding directories in the 1.0.0 release. Then create 
`org.apache.spark.api.python.AvroGenericConverter` in the same directory as the 
other `org.apache.spark.api.python` namespace files, change the namespace on 
`AvroGenericConverter` in the code accordingly, and build the project with 
`./make_distribution.sh` 

The modified code for loading the avro files in pyspark is

avroRdd = sc.newAPIHadoopFile(filename, 
"org.apache.avro.mapreduce.AvroKeyInputFormat", 
"org.apache.avro.mapred.AvroKey", 
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.api.python.AvroGenericConverter")



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-29 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50515663
  
@ericgarcia Could you please create a public branch with this code in a 
working state and push it to your clone of spark so I can use that? I'm bad at 
merging conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-29 Thread srowen
Github user srowen commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50515231
  
@rjurney Yes I doubt in general that patches necessarily magically apply 
cleanly across versions. You may have to massage it as with any merge conflict.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-29 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50514645
  
@ericgarcia @srowen Sorry, but again I can't make things go. I try to pull 
that request to branch-1.0.0 via: 'git fetch origin pull/455/head:master' but I 
get this:

From github.com:apache/spark
 ! [rejected]refs/pull/455/head -> master  (non-fast-forward)
 * [new tag] v0.9.2 -> v0.9.2
 * [new tag] v1.0.2-rc1 -> v1.0.2-rc1
Russells-MacBook-Pro:spark-trunk rjurney$ git status

Which looks like it isn't applying?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-29 Thread ericgarcia
Github user ericgarcia commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50501423
  
@rjurney I had the same problem with the master branch so I patched the 
relevant changes from this pull request into the 1.0.0 release, tossed the 
AvroGenericConverter class into 
core/src/main/scala/org/apache/spark/api/python, built it and it worked fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-29 Thread srowen
Github user srowen commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50451633
  
@rjurney This means you have two conflicting versions of Netty in the 
build. It may or may not be to do with the JIRA you cite, just because there 
are lots of Netties floating around and lots of ways they can collide. Maybe 
you can restate what version you are buildling, how you are building at this 
point and any modifications to the build? `mvn dependency:tree` will reveal 
where Netties are coming from.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-28 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50425757
  
Cleaned, made no difference. See 
https://issues.apache.org/jira/browse/SPARK-1138 where others had this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-28 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50425084
  
Hmmm, I didn't clean before rebuilding with CDH 4.4. Trying that now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-28 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50424644
  
Ok, so now I rebuilt with my specific CDH version, and I get this when I 
run ./sbin/start-master.sh:

Spark Command: /usr/java/jdk1.8.0//bin/java -cp 
::/home/hivedata/spark/conf:/home/hivedata/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop2.0.0-mr1-cdh4.4.0.jar
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m 
org.apache.spark.deploy.master.Master --ip hivecluster2 --port 7077 
--webui-port 8080


Java HotSpot(TM) 64-Bit Server VM warning: ignoring option 
MaxPermSize=128m; support was removed in 8.0
14/07/28 18:33:37 INFO Master: Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties
14/07/28 18:33:37 INFO Master: Registered signal handlers for [TERM, HUP, 
INT]
14/07/28 18:33:37 INFO SecurityManager: Changing view acls to: hivedata
14/07/28 18:33:37 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hivedata)
14/07/28 18:33:38 INFO Slf4jLogger: Slf4jLogger started
14/07/28 18:33:38 INFO Remoting: Starting remoting
14/07/28 18:33:38 ERROR ActorSystemImpl: Uncaught fatal error from thread 
[sparkMaster-akka.actor.default-dispatcher-2] shutting down ActorSystem 
[sparkMaster]
java.lang.VerifyError: (class: 
org/jboss/netty/channel/socket/nio/NioWorkerPool, method: createWorker 
signature: 
(Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;)
 Wrong return type in function
at 
akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:282)
at 
akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:239)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at 
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:161)
at 
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at 
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at 
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:200)
at 
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at 
akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
at 
akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/07/28 18:33:38 INFO RemoteActorRefProvider$RemotingTerminator: Shutting 
down remote daemon.
14/07/28 18:33:38 INFO RemoteActorRefProvider$RemotingTerminator: Remote 
daemon shut down; proceeding with flushing remote transports.
14/07/28 18:33:38 INFO RemoteActorRefProvider$RemotingTerminator: Remoting 
shut down.
Exception in thread "main" java.util.concurrent.TimeoutException: Futures 
timed out after [1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent

[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-28 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50422374
  
@srowen Actually yes, I'm that stupid :) Figured it out on me own though, 
have it building across the cluster now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-28 Thread srowen
Github user srowen commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50422265
  
You're not literally writing 4.X.X in the version are you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-28 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50422135
  
@JoshRosen 

Huh, I'm going by 
http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html

and I get:

sbt.ResolveException: unresolved dependency: 
org.apache.hadoop#hadoop-client;2.0.0-mr1-cdh4.X.X: not found


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-28 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50422022
  
@JoshRosen Actually, I just did: sbt/sbt assembly publish-local

Trying again with: SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.X.X sbt/sbt assembly 
publish-local


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-28 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50421644
  
@rjurney 

Did you build Spark against your cluster's Hadoop version?  Do you mind 
posting the build command that you used?

> Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.hadoopFile.
: java.lang.ClassCastException: 
org.apache.avro.mapreduce.AvroKeyInputFormat cannot be cast to 
org.apache.hadoop.mapred.InputFormat

You need to use `newAPIHadoopFile`, not `hadoopFile`, since 
`mapreduce.InputFormat`s can only be used with that API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-28 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50420550
  
@ericgarcia I am running CDH 4.4 with mapreduce 1, and when I run this on a 
cluster running Spark master in standalone mode:

   avroRdd = 
sc.newAPIHadoopFile("hdfs://hivecluster2:8020/e8/dev/web_proxy_mef/2014/07/28", 
   "org.apache.avro.mapreduce.AvroKeyInputFormat", 
   "org.apache.avro.mapred.AvroKey", 
   "org.apache.hadoop.io.NullWritable",
   
keyConverter="org.apache.spark.examples.pythonconverters.AvroGenericConverter")

The error is the well known hadoop 1 vs hadoop 2 issue. 

   Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
   : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot 
communicate with client version 4

So I also tried hadoopFile, instead of newAPIHadoopFile. I run:

   avroRdd = 
sc.hadoopFile("hdfs://hivecluster2:8020/e8/dev/web_proxy_mef/2014/07/28/20/part-m-0.avro",
 
   "org.apache.avro.mapreduce.AvroKeyInputFormat", 
   "org.apache.avro.mapred.AvroKey", 
   "org.apache.hadoop.io.NullWritable",
   
keyConverter="org.apache.spark.examples.pythonconverters.AvroGenericConverter")

And I get:

   Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.hadoopFile.
   : java.lang.ClassCastException: 
org.apache.avro.mapreduce.AvroKeyInputFormat cannot be cast to 
org.apache.hadoop.mapred.InputFormat

What am I to do? :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-28 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50419281
  
Not sure if its related, but when I try to start workers having built from 
trunk, I get this: 

   [hivedata@hivecluster2 spark]$ ./bin/spark-class 
org.apache.spark.deploy.worker.Worker spark://hivecluster2:8080
   Java HotSpot(TM) 64-Bit Server VM warning: ignoring option 
MaxPermSize=128m; support was removed in 8.0
   14/07/28 17:05:48 INFO Worker: Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties
   14/07/28 17:05:48 INFO Worker: Registered signal handlers for [TERM, 
HUP, INT]
14/07/28 17:05:49 INFO SecurityManager: Changing view acls to: hivedata
14/07/28 17:05:49 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hivedata)
14/07/28 17:05:49 INFO Slf4jLogger: Slf4jLogger started
14/07/28 17:05:49 INFO Remoting: Starting remoting
14/07/28 17:05:49 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkWorker@hivecluster2:36500]
14/07/28 17:05:49 INFO Worker: Starting Spark worker hivecluster2:36500 
with 8 cores, 14.6 GB RAM
14/07/28 17:05:49 INFO Worker: Spark home: /home/hivedata/spark
14/07/28 17:05:50 INFO WorkerWebUI: Started WorkerWebUI at 
http://hivecluster2:8081
14/07/28 17:05:50 INFO Worker: Connecting to master 
spark://hivecluster2:8080...
14/07/28 17:05:50 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@hivecluster2:36500] -> 
[akka.tcp://sparkMaster@hivecluster2:8080]: Error [Association failed with 
[akka.tcp://sparkMaster@hivecluster2:8080]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkMaster@hivecluster2:8080]
Caused by: akka.remote.transport.AkkaProtocolException: The remote system 
explicitly disassociated (reason unknown).
]
14/07/28 17:05:50 INFO LocalActorRef: Message 
[akka.remote.transport.AssociationHandle$Disassociated] from 
Actor[akka://sparkWorker/deadLetters] to 
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%40hivecluster2%3A8080-1#-396559240]
 was not delivered. [1] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
14/07/28 17:05:50 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@hivecluster2:36500] -> 
[akka.tcp://sparkMaster@hivecluster2:8080]: Error [Association failed with 
[akka.tcp://sparkMaster@hivecluster2:8080]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkMaster@hivecluster2:8080]
Caused by: akka.remote.transport.AkkaProtocolException: The remote system 
explicitly disassociated (reason unknown).
]
14/07


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-24 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50111706
  
You'd need to run loading code off master branch. It should be in 1.1 
release in a few weeks—
Sent from Mailbox

On Fri, Jul 25, 2014 at 4:14 AM, Russell Jurney 
wrote:

> I got this to run and I'm able to get work done!
> Does this code have to be run on the latest Spark code? Would it run on 
1.0?
> On Tuesday, July 22, 2014, Eric Garcia  wrote:
>> @MLnick , I made a PR here: #1536
>> 
>> @rjurney , the updated code works for the
>> .avro file you posted though it is still not fully implemented for *all*
>> data types. Note that any null values in your data will show up as an 
empty
>> string "". For some reason I could not get Java null to convert to Python
>> None.
>>
>> —
>> Reply to this email directly or view it on GitHub
>> .
>>
> -- 
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com 
datasyndrome.com
> ---
> Reply to this email directly or view it on GitHub:
> https://github.com/apache/spark/pull/455#issuecomment-50101315


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-24 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50101315
  
I got this to run and I'm able to get work done!

Does this code have to be run on the latest Spark code? Would it run on 1.0?

On Tuesday, July 22, 2014, Eric Garcia  wrote:

> @MLnick , I made a PR here: #1536
> 
> @rjurney , the updated code works for the
> .avro file you posted though it is still not fully implemented for *all*
> data types. Note that any null values in your data will show up as an 
empty
> string "". For some reason I could not get Java null to convert to Python
> None.
>
> —
> Reply to this email directly or view it on GitHub
> .
>


-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome.com


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-24 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-50084602
  
When I load my records, I get:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
: org.apache.spark.SparkDriverExecutionException: Execution error
at 
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:585)
at 
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:563)
Caused by: scala.MatchError: UNION (of class org.apache.avro.Schema$Type)
at 
org.apache.spark.examples.pythonconverters.AvroGenericConverter.org$apache$spark$examples$pythonconverters$AvroGenericConverter$$unpack$1(AvroGenericConverter.scala:24)
at 
org.apache.spark.examples.pythonconverters.AvroGenericConverter$$anonfun$unpackRecord$1$1.apply(AvroGenericConverter.scala:17)
at 
org.apache.spark.examples.pythonconverters.AvroGenericConverter$$anonfun$unpackRecord$1$1.apply(AvroGenericConverter.scala:17)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.examples.pythonconverters.AvroGenericConverter.unpackRecord$1(AvroGenericConverter.scala:17)
at 
org.apache.spark.examples.pythonconverters.AvroGenericConverter.convert(AvroGenericConverter.scala:36)
at 
org.apache.spark.examples.pythonconverters.AvroGenericConverter.convert(AvroGenericConverter.scala:12)
at 
org.apache.spark.api.python.PythonHadoopUtil$$anonfun$convertRDD$1.apply(PythonHadoopUtil.scala:126)
at 
org.apache.spark.api.python.PythonHadoopUtil$$anonfun$convertRDD$1.apply(PythonHadoopUtil.scala:126)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1068)
at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1068)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)
at 
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:578)
... 1 more



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-22 Thread ericgarcia
Github user ericgarcia commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-49792170
  
@MLnick, I made a PR here: https://github.com/apache/spark/pull/1536
@rjurney, the updated code works for the .avro file you posted though it is 
still not fully implemented for _all_ data types. Note that any null values in 
your data will show up as an empty string "". For some reason I could not get 
Java null to convert to Python None. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-22 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-49716773
  
@ericgarcia hey that's cool. Would you mind creating a pull request to add 
that to the Spark examples? It would be very useful to others to see how to 
make a fully generic Avro converter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-21 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-49700443
  
@ericgarcia Thanks! Very exciting. An example file is here: 
https://drive.google.com/file/d/0B3wy0wXNwbpRekJVaW13cGRKb1U/edit?usp=sharing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-21 Thread ericgarcia
Github user ericgarcia commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-49699078
  
@rjurney Nice job adding the DOUBLE and FLOAT. I might be able to get the 
UNIONS working if you post an example .avro file for me to test it with. I 
neglected the data structures that I didn't happen to be using.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-21 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-49698499
  
This file, without any UNIONS, works: 
https://github.com/miguno/avro-cli-examples/blob/master/twitter.snappy.avro

My data is more complex :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-21 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-49698081
  
It also looks like we need a custom function to handle the UNION type. I've 
extended what you wrote for DOUBLE/FLOAT:

def unpack(value: Any, schema: Schema): Any = schema.getType match {
  case STRING => value.asInstanceOf[java.lang.String]
  case ENUM => value.toString
  case LONG => value.asInstanceOf[java.lang.Long]
  case INT => value.asInstanceOf[java.lang.Integer]
  case FLOAT => value.asInstanceOf[java.lang.Float]
  case DOUBLE => value.asInstanceOf[java.lang.Double]
  case ARRAY => unpackArray(value, schema.getElementType)
  case RECORD => unpackRecord(value.asInstanceOf[GenericRecord])
  case _ => value.toString
}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-21 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-49697592
  
My data has doubles in it, could that be the issue?

Using Python version 2.7.6rc1 (v2.7.6rc1:4913d0e9be30+, Oct 27 2013 
20:52:11)
SparkContext available as sc.

In [1]: avroRdd = sc.newAPIHadoopFile('/tmp/part-m-0.avro', 
   ...: "org.apache.avro.mapreduce.AvroKeyInputFormat", 
   ...: "org.apache.avro.mapred.AvroKey", 
   ...: "org.apache.hadoop.io.NullWritable",
   ...: 
keyConverter="org.apache.spark.examples.pythonconverters.AvroGenericConverter")
14/07/21 21:38:48 INFO MemoryStore: ensureFreeSpace(56082) called with 
curMem=0, maxMem=309225062
14/07/21 21:38:48 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 54.8 KB, free 294.8 MB)
14/07/21 21:38:48 INFO Converter: Loaded converter: 
org.apache.spark.examples.pythonconverters.AvroGenericConverter
14/07/21 21:38:48 INFO FileInputFormat: Total input paths to process : 1
14/07/21 21:38:48 INFO SparkContext: Starting job: first at 
SerDeUtil.scala:69
14/07/21 21:38:48 INFO DAGScheduler: Got job 0 (first at 
SerDeUtil.scala:69) with 1 output partitions (allowLocal=true)
14/07/21 21:38:48 INFO DAGScheduler: Final stage: Stage 0(first at 
SerDeUtil.scala:69)
14/07/21 21:38:48 INFO DAGScheduler: Parents of final stage: List()
14/07/21 21:38:48 INFO DAGScheduler: Missing parents: List()
14/07/21 21:38:48 INFO DAGScheduler: Computing the requested partition 
locally
14/07/21 21:38:48 INFO NewHadoopRDD: Input split: 
file:/tmp/part-m-0.avro:0+1996482
14/07/21 21:38:48 WARN AvroKeyInputFormat: Reader schema was not set. Use 
AvroJob.setInputKeySchema() if desired.
14/07/21 21:38:48 INFO AvroKeyInputFormat: Using a reader schema equal to 
the writer schema.
14/07/21 21:38:48 INFO DAGScheduler: Failed to run first at 
SerDeUtil.scala:69
---
Py4JJavaError Traceback (most recent call last)
 in ()
  3 "org.apache.avro.mapred.AvroKey",
  4 "org.apache.hadoop.io.NullWritable",
> 5 
keyConverter="org.apache.spark.examples.pythonconverters.AvroGenericConverter")

/Users/rjurney/Software/spark-trunk/python/pyspark/context.pyc in 
newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, 
keyConverter, valueConverter, conf)
402 jconf = self._dictToJavaMap(conf)
403 jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, 
path, inputFormatClass, keyClass,
--> 404 valueClass, 
keyConverter, valueConverter, jconf)
405 return RDD(jrdd, self, PickleSerializer())
406 


/Users/rjurney/Software/spark-trunk/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
535 answer = self.gateway_client.send_command(command)
536 return_value = get_return_value(answer, self.gateway_client,
--> 537 self.target_id, self.name)
538 
539 for temp_arg in temp_args:


/Users/rjurney/Software/spark-trunk/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
: org.apache.spark.SparkDriverExecutionException: Execution error
at 
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:585)
at 
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:563)
Caused by: java.lang.NullPointerException
at 
org.apache.spark.examples.pythonconverters.AvroGenericConverter.org$apache$spark$examples$pythonconverters$AvroGenericConverter$$unpack$1(AvroGenericConverter.scala:31)
at 
org.apache.spark.examples.pythonconverters.AvroGenericConverter$$anonfun$unpackRecord$1$1.apply(AvroGenericConverter.scala:17)
at 
org.apache.spark.examples.pythonconverters.AvroGenericConverter$$anonfun$unpackRecord$1$1.apply(AvroGenericConverter.scala:17)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection

[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-21 Thread ericgarcia
Github user ericgarcia commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-49687447
  
@rjurney I saved the above to 

`spark/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroGenericConverter.scala`
rebuilt spark with 
`./sbt/sbt assembly` 
and then loaded pyspark with 

`SPARK_CLASSPATH=examples/target/scala-2.10/spark-examples-1.1.0-SNAPSHOT-hadoop1.0.4.jar
 IPYTHON=1 bin/pyspark` 
Inside of pyspark you should be able to load up yourAvro file with:

avroRdd = sc.newAPIHadoopFile(, 
"org.apache.avro.mapreduce.AvroKeyInputFormat", 
"org.apache.avro.mapred.AvroKey", 
"org.apache.hadoop.io.NullWritable",

keyConverter="org.apache.spark.examples.pythonconverters.AvroGenericConverter")

Let me know if it works for you : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-21 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-49684527
  
@ericgarcia This is awesome. How can I test this code out? I can handle 
patching trunk, but how do I call the converter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-07-21 Thread ericgarcia
Github user ericgarcia commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-49681825
  
@rjurney Thanks for your question on Avro and pyspark. @MLnick I used your 
code as a starting point to successfully read a few different Avro data types 
in pyspark instead of merely a String. This could be added in with 
HBaseConverter.scala and CassandraConverters.scala

package org.apache.spark.examples.pythonconverters

import org.apache.spark.api.python.Converter
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import collection.JavaConversions._
import org.apache.avro.Schema.Field
import org.apache.avro.Schema
import org.apache.avro.Schema.Type._

class AvroGenericConverter extends Converter[AvroKey[GenericRecord], 
java.util.Map[String, Any]] {
  override def convert(obj: AvroKey[GenericRecord]): 
java.util.Map[String, Any] = {
val record = obj.datum()

def unpackRecord(record: GenericRecord): java.util.Map[String,Any] 
= {
  mapAsJavaMap(record.getSchema.getFields.map( f => (f.name, 
unpack(record.get(f.name), f.schema) ) ).toMap)
}

def unpackArray(value: Any, schema: Schema): java.util.List[Any] = {
  bufferAsJavaList(value.asInstanceOf[java.util.List[Any]].map( v 
=> unpack(v, schema)))
}

def unpack(value: Any, schema: Schema): Any = schema.getType match {
  case STRING => value.asInstanceOf[java.lang.String]
  case ENUM => value.toString
  case LONG => value.asInstanceOf[java.lang.Long]
  case INT => value.asInstanceOf[java.lang.Integer]
  case ARRAY => unpackArray(value, schema.getElementType)
  case RECORD => unpackRecord(value.asInstanceOf[GenericRecord])
  case _ => value.toString
}

unpackRecord(record)
  }
}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-22 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-46810658
  
hmmm not sure - master built fine for me at the time I posted. Either try 
pull again or maybe checkout my branch and try that: 
https://github.com/MLnick/spark-1/tree/pyspark-inputformats


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-22 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-46789138
  
Thanks, master doesn't build for me. Is there a particular commit you
recommend using?

[error]

[error]   last tree to typer:
Literal(Constant(org.apache.spark.sql.catalyst.types.PrimitiveType))

[error]   symbol: null

[error]symbol definition: null

[error]  tpe:
Class(classOf[org.apache.spark.sql.catalyst.types.PrimitiveType])

[error]symbol owners:

[error]   context owners: object TestSQLContext -> package test

[error]

[error] == Enclosing template or block ==

[error]

[error] Template( // val :  in object
TestSQLContext, tree.tpe=org.apache.spark.sql.test.TestSQLContext.type

[error]   "org.apache.spark.sql.SQLContext" // parents

[error]   ValDef(

[error] private

[error] "_"

[error] 

[error] 

[error]   )

[error]   // 2 statements

[error]   DefDef( // private def readResolve(): Object in object
TestSQLContext

[error]  private 

[error] "readResolve"

[error] []

[error] List(Nil)

[error]  // tree.tpe=Object

[error] test.this."TestSQLContext" // object TestSQLContext in package
test, tree.tpe=org.apache.spark.sql.test.TestSQLContext.type

[error]   )

[error]   DefDef( // def ():
org.apache.spark.sql.test.TestSQLContext.type in object TestSQLContext

[error] 

[error] ""

[error] []

[error] List(Nil)

[error]  // tree.tpe=org.apache.spark.sql.test.TestSQLContext.type

[error] Block( // tree.tpe=Unit

[error]   Apply( // def (sparkContext:
org.apache.spark.SparkContext): org.apache.spark.sql.SQLContext in class
SQLContext, tree.tpe=org.apache.spark.sql.SQLContext

[error] TestSQLContext.super."" // def (sparkContext:
org.apache.spark.SparkContext): org.apache.spark.sql.SQLContext in class
SQLContext, tree.tpe=(sparkContext:
org.apache.spark.SparkContext)org.apache.spark.sql.SQLContext

[error] Apply( // def (master: String,appName: String,conf:
org.apache.spark.SparkConf): org.apache.spark.SparkContext in class
SparkContext, tree.tpe=org.apache.spark.SparkContext

[error]   new org.apache.spark.SparkContext."" // def
(master: String,appName: String,conf: org.apache.spark.SparkConf):
org.apache.spark.SparkContext in class SparkContext, tree.tpe=(master:
String, appName: String, conf:
org.apache.spark.SparkConf)org.apache.spark.SparkContext

[error]   // 3 arguments

[error]   "local"

[error]   "TestSQLContext"

[error]   Apply( // def (): org.apache.spark.SparkConf in
class SparkConf, tree.tpe=org.apache.spark.SparkConf

[error] new org.apache.spark.SparkConf."" // def
(): org.apache.spark.SparkConf in class SparkConf,
tree.tpe=()org.apache.spark.SparkConf

[error] Nil

[error]   )

[error] )

[error]   )

[error]   ()

[error] )

[error]   )

[error] )

[error]

[error] == Expanded type of tree ==

[error]

[error] ConstantType(

[error]   value =
Constant(org.apache.spark.sql.catalyst.types.PrimitiveType)

[error] )

[error]

[error] uncaught exception during compilation: java.lang.AssertionError

java.lang.AssertionError: assertion failed: List(object package$DebugNode,
object package$DebugNode)

at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678)

at

scala.reflect.internal.Symbols$ClassSymbol.companionModule0(Symbols.scala:2988)

at

scala.reflect.internal.Symbols$ClassSymbol.companionModule(Symbols.scala:2991)

at
scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1371)

at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120)

at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583)

at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557)

at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553)

at scala.tools.nsc.Global$Run.compile(Global.scala:1662)

at xsbt.CachedCompiler0.run(CompilerInterface.scala:123)

at xsbt.CachedCompiler0.run(CompilerInterface.scala:99)

at xsbt.CompilerInterface.run(CompilerInterface.scala:27)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
 

[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-21 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-46773577
  
1.1 is not released yet. This PR is in master but not in 1.0 (it may be 
released in 1.0.1 or if not then 1.1).


So you'll have to clone master and run sbt/sbt publish-local which will 
publish the maven and sbt artifacts to your local repos.



—
Sent from Mailbox

On Sun, Jun 22, 2014 at 1:22 AM, Russell Jurney 
wrote:

> Thanks a ton! One thing - how can I pull spark core 1.1 from maven?
> [ERROR] Failed to execute goal on project avro: Could not resolve
> dependencies for project example:avro:jar:0.1: Could not find artifact
> org.apache.spark:spark-core_2.10:jar:1.1.0-SNAPSHOT in scala-tools.org (
> http://scala-tools.org/repo-releases) -> [Help 1]
> On Fri, Jun 20, 2014 at 10:45 PM, MLnick  wrote:
>> @rjurney  this works for me (building Spark
>> from current master): https://gist.github.com/MLnick/5864741781b9340cb211
>>
>> if you run mvn package and then add that to SPARK_CLASSPATH and use it in
>> IPython console.
>>
>> However it seems to come through as only strings (not a dict). I verified
>> that if I take only the string field and explicitly convert to string 
(ie Map[String,
>> String]) then it works. I suspect then that Avro doesn't have the type
>> information at all, so Pyrolite cannot pickle it. I guess you might have 
to
>> do something more in depth in the AvroConverter to read the type info
>> from the Avro schema and do a cast...
>>
>> —
>> Reply to this email directly or view it on GitHub
>> .
>>
> -- 
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com 
datasyndrome.com
> ---
> Reply to this email directly or view it on GitHub:
> https://github.com/apache/spark/pull/455#issuecomment-46767642


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-21 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-46767642
  
Thanks a ton! One thing - how can I pull spark core 1.1 from maven?

[ERROR] Failed to execute goal on project avro: Could not resolve
dependencies for project example:avro:jar:0.1: Could not find artifact
org.apache.spark:spark-core_2.10:jar:1.1.0-SNAPSHOT in scala-tools.org (
http://scala-tools.org/repo-releases) -> [Help 1]


On Fri, Jun 20, 2014 at 10:45 PM, MLnick  wrote:

> @rjurney  this works for me (building Spark
> from current master): https://gist.github.com/MLnick/5864741781b9340cb211
>
> if you run mvn package and then add that to SPARK_CLASSPATH and use it in
> IPython console.
>
> However it seems to come through as only strings (not a dict). I verified
> that if I take only the string field and explicitly convert to string (ie 
Map[String,
> String]) then it works. I suspect then that Avro doesn't have the type
> information at all, so Pyrolite cannot pickle it. I guess you might have 
to
> do something more in depth in the AvroConverter to read the type info
> from the Avro schema and do a cast...
>
> —
> Reply to this email directly or view it on GitHub
> .
>



-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com datasyndrome.com


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-20 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-46745394
  
@rjurney this works for me (building Spark from current master): 
https://gist.github.com/MLnick/5864741781b9340cb211

if you run ```mvn package``` and then add that to ```SPARK_CLASSPATH``` and 
use it in IPython console.

However it seems to come through as only strings (not a dict). I verified 
that if I take only the string field and explicitly convert to string (ie 
```Map[String, String]```) then it works. I suspect then that Avro doesn't have 
the type information at all, so Pyrolite cannot pickle it. I guess you might 
have to do something more in depth in the ```AvroConverter``` to read the type 
info from the Avro schema and do a cast...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-20 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-46731898
  
That sounds awesome, but can you put this into context a little bit, in 
terms of where I would put that code and how I would run it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-20 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-46656191
  
@rjurney it should work but you'll need to write a converter like:

```scala
class AvroConverter extends Converter[AvroKey[GenericRecord], 
java.util.Map[String, Any]] {
  import collection.JavaConversions._

  override def convert(obj: AvroKey[GenericRecord]): java.util.Map[String, 
Any] = {
val record = obj.datum()
val schema = record.getSchema
mapAsJavaMap(schema.getFields.map( f => (f.name, record.get(f.name)) 
).toMap)
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-19 Thread rjurney
Github user rjurney commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-46638552
  
It would be really helpful if this enabled loading of Avro data via 
GenericRecord.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-11 Thread kanzhang
Github user kanzhang commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45827287
  
@MLnick thanks for the tip, Nick! I'll give it a try and ask you to review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-11 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45803692
  
I had planned to take a look at that next. If you want to take a crack at 
it I'm happy to review.


You'd have to more or less do things in reverse. The PySpark code for spark 
SQL does something similar to what would be required, ie takes an RDD of bytes 
(pickled objects from python) and deserializes them to Java objects. Finally 
you'll need to convert those to Writables so you can use the relevant output 
format.






—
Sent from Mailbox

On Wed, Jun 11, 2014 at 1:55 AM, kanzhang 
wrote:

> @MLnick hey Nick, are you planning on implementing the reverse direction 
- ```saveAsSequenceFile``` and ```saveAsHadoopFile```? If not, I'd like to give 
it a shot. 
> ---
> Reply to this email directly or view it on GitHub:
> https://github.com/apache/spark/pull/455#issuecomment-45687030


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-10 Thread kanzhang
Github user kanzhang commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45687030
  
@MLnick hey Nick, are you planning on implementing the reverse direction - 
```saveAsSequenceFile``` and ```saveAsHadoopFile```? If not, I'd like to give 
it a shot. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/455


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-09 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45574901
  
Thanks Nick for the latest changes and Patrick for the comments! I've 
merged this in now. This will be an awesome feature to have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45439532
  
Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45439535
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15544/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45437963
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45437958
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-08 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45432289
  
As an aside, I am generally -1 on adding a lot of specific reading/writing 
code to Spark core.

My view is, that is why InputFormat/OutputFormat support is there - to 
provide that custom read/write functionality. Now it makes sense for something 
like Parquet with SparkSQL as the preferred format for efficiency (in much the 
same way as SequenceFiles are often the preferred format in many Hadoop 
pipelines), but should Spark core contain standardised methods for 
.saveAsXXXFile for every format? IMO, no - the examples show how to do things 
with common formats.

I can see why providing contrib modules for reading/writing structured 
(RDBMS-like) data via common formats for SparkSQL makes sense, as there will 
probably be one "correct" way of doing this.

But looking at the HBase PR you referenced, I don't see the value of having 
that live in Spark. And why is it not simply using an ```OutputFormat``` 
instead of custom config and writing code? (I might be missing something here, 
but it seems to add complexity and maintenance burden unnecessarily)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-08 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45432064
  
@pwendell thanks for the comments. Will make those amendments.

I get that SparkSQL will be the preferred way of doing things for 
structured data in general, so happy to mark this feature as advanced and 
experimental, and have it superseded by the relevant SparkSQL functionality 
later.

Having said this, I see this as simply adding missing API functionality to 
PySpark. All it does is bring PySpark on a par with Scala/Java in terms of 
being able to read from any ```InputFormat```. The vast majority of Spark users 
end up using ```textFile```, with others probably using ```SequenceFile``` and 
other common formats (such as HBase, Cassandra, Parquet etc). I expect this to 
be the case here - most users won't generally use this functionality, but some 
with more special or advanced cases will (and some users do have weird and 
custom inputformats and non-structured data).

The ```Converter``` interface is only really used when the data being read 
is of some special format (such as HBase and Cassandra binary data). For 
example, reading Elasticsearch "just works" and uses the standard converter to 
convert the Writables to primitives. So using ES with SchemaRDD is as easy as 
below (and this is exactly how it would work in Scala/Java too):

```python
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

conf = {"es.resource" : "index/type"}   # assume Elasticsearch is running 
on localhost defaults
rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", 
"org.apache.hadoop.io.NullWritable", 
"org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
rdd.first() # the result is a MapWritable that is converted to a 
Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

dicts = rdd.map(lambda x: x[1])
table = sqlCtx.inferSchema(dicts)
table.registerAsTable("table")
```

@mateiz I agree that the ```Converter``` interface is only an extension 
point to be used with cases where the inputformat generates data that is not 
Pickleable (or is e.g. binary data that needs to be extracted), and that we 
shouldn't add any to Spark itself - the Cassandra and HBase ones are just 
examples of what can be done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45421855
  
@mateiz I see - if this is required for supporting arbitrary input types, I 
guess it's the only way. I'm hoping in the future though most people don't have 
to write their own converters because we support the most common storage 
layers. So I'll withdraw my concern.

@MLnick thanks for responding to the earlier feedback I think the patch is 
looking good. I made two minor comments, if you can address those than it's 
good with me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13521516
  
--- Diff: docs/programming-guide.md ---
@@ -378,11 +378,88 @@ Some notes on reading files with Spark:
 
 * The `textFile` method also takes an optional second argument for 
controlling the number of slices of the file. By default, Spark creates one 
slice for each block of the file (blocks being 64MB by default in HDFS), but 
you can also ask for a higher number of slices by passing a larger value. Note 
that you cannot have fewer slices than blocks.
 
-Apart reading files as a collection of lines,
+Apart from reading files as a collection of lines,
 `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file.
 
-
+### SequenceFile and Hadoop InputFormats
+
+In addition to reading text files, PySpark supports reading 
[SequenceFile](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html)
 
+and any arbitrary 
[InputFormat](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/InputFormat.html).
+
+ Writable Support
+
+PySpark SequenceFile support loads an RDD within Java, and pickles the 
resulting Java objects using
+[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables 
are automatically converted:
+
+
+Writable TypePython Type
+Textunicode str
+IntWritableint
+FloatWritablefloat
+DoubleWritablefloat
+BooleanWritablebool
+BytesWritablebytearray
+NullWritableNone
+ArrayWritablelist of primitives, or tuple of 
objects
+MapWritabledict
+Custom Class conforming to Java Bean conventions
+dict of public properties (via JavaBean getters and setters) + 
__class__ for the class type
+
+
+ Loading SequenceFiles
+
+Similarly to text files, SequenceFiles can be loaded by specifying the 
path. The key and value
+classes can be specified, but for standard Writables it should work 
without requiring this.
+
+{% highlight python %}
+>>> rdd = sc.sequenceFile("path/to/sequencefile/of/doubles")
+>>> rdd.collect() # this example has DoubleWritable keys and Text 
values
+[(1.0, u'aa'),
+ (2.0, u'bb'),
+ (2.0, u'aa'),
+ (3.0, u'cc'),
+ (2.0, u'bb'),
+ (1.0, u'aa')]
+{% endhighlight %}
+
+ Loading Arbitrary Hadoop InputFormats
+
+PySpark can also read any Hadoop InputFormat, for both 'new' and 'old' 
Hadoop APIs. If required,
+a Hadoop configuration can be passed in as a Python dict. Here is an 
example using the
+Elasticsearch ESInputFormat:
+
+{% highlight python %}
+$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
+>>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is 
running on localhost defaults
+>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
+"org.apache.hadoop.io.NullWritable", 
"org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
+>>> rdd.first() # the result is a MapWritable that is converted to 
a Python dict
+(u'Elasticsearch ID',
+ {u'field1': True,
+  u'field2': u'Some Text',
+  u'field3': 12345})
+{% endhighlight %}
 
+Note that, if the InputFormat simply depends on a Hadoop configuration 
and/or input path, and
+the key and value classes can easily be converted according to the above 
table,
+then this approach should work well for such cases.
+
+If you have custom serialized binary data (like pulling data from 
Cassandra / HBase) or custom
+classes that don't conform to the JavaBean requirements, then you will 
first need to 
+transform that data on the Scala/Java side to something which can be 
handled by Pyrolite's pickler.
+A [Converter](api/scala/index.html#org.apache.spark.api.python.Converter) 
trait is provided 
+for this. Simply extend this trait and implement your transformation code 
in the ```convert``` 
+method. The ensure this class is packaged into your Spark job jar and 
included on the PySpark 
--- End diff --

Typo: "The ensure".

Also, I'd mention that they need to include both the `Convereter` 
implementation and also any dependencies required to read from the storage 
system (e.g. `hbase-client`) those actually aren't packaged by default with 
Spark except in the examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket

[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13521493
  
--- Diff: docs/programming-guide.md ---
@@ -378,11 +378,88 @@ Some notes on reading files with Spark:
 
 * The `textFile` method also takes an optional second argument for 
controlling the number of slices of the file. By default, Spark creates one 
slice for each block of the file (blocks being 64MB by default in HDFS), but 
you can also ask for a higher number of slices by passing a larger value. Note 
that you cannot have fewer slices than blocks.
 
-Apart reading files as a collection of lines,
+Apart from reading files as a collection of lines,
 `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file.
 
-
+### SequenceFile and Hadoop InputFormats
--- End diff --

Further up in this guide there is a statement that says:

```
The current API is limited to text files, but support for binary Hadoop 
InputFormats is expected in future versions.
```

Given this patch, it probably makes sense to remove that :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45420441
  
(Also, once Spark SQL supports a given interface, or even before, we can 
simply leave out patches that add new Converters. There's no need to provide a 
huge collection of them, in fact I'd provide none beyond the default one. The 
examples here are meant to show how to write your own converter.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45420334
  
I personally think Converter is good to keep because it's the only way to 
expose arbitrary InputFormats to Python. In Java and Scala, you can always call 
SparkContext.hadoopRDD to get an RDD of Java objects of whatever type there was 
in the InputFormat. That's why no such interface is needed there. But in 
Python, there needs to be a way to turn the Java type into a Pickle-able 
object, and this way seems as good as any.

We should just document this as being an advanced features and less 
preferred than using Spark SQL when Spark SQL supports the given data source.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45415893
  
So after looking at this more. I think it's great to add to support for 
sequence file format in Python, it will be super useful.

This patch takes things a step further and also introduces an API for users 
to plug in different types of Hadoop input formats. I'm a bit concerned about 
exposing that to users (which this patch does via the `Converter` stuff) 
because I'm sure people will start building on it and it's not an interface I'd 
be happy supporting going forward. For one thing, it only applies to Python. 
Another thing is that there are other outstanding community proposals for how 
to deal with things like HBase support (see #194). In the near future (possibly 
1.1) we're planning to standardize the way this works via SchemaRDD, and that 
will be automatically supported in Python. Basically, we'll have ways to read 
and write to a SchemaRDD from several storage systems.

So I just don't think it's a good idea to introduce a different pluggable 
mechanism here and encourage users to integrate at this level, and I wonder if 
we should just merge a version of this patch that only exposes sequenceFile and 
not the more general converter mechanism.

@mateiz @marmbrus curious to hear your thoughts as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13520371
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.SparkContext
+import org.apache.hadoop.io._
+import scala.Array
+import java.io.{DataOutput, DataInput}
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
+import org.apache.spark.api.java.JavaSparkContext
+
+/**
+ * A class to test MsgPack serialization on the Scala side, that will be 
deserialized
+ * in Python
+ * @param str
+ * @param int
+ * @param double
+ */
+case class TestWritable(var str: String, var int: Int, var double: Double) 
extends Writable {
--- End diff --

All the classes in this file should be `private[spark]` since they are not 
in the test package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45414134
  
Hey @MLnick thanks for making those changes. This is looking good! I didn't 
get to totally finish the review yesterday but I'll make sure I finish it today


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45412791
  
Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45412792
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15526/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45411714
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45411710
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread MLnick
Github user MLnick commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45411654
  
@pwendell @mateiz ok I addressed those comments, and simplified things 
quite a bit. I removed the registry and just used ```.get()``` (etc) directly 
for the standard ```Writable``` types.

Also created factory method for instantiating a ```Converter``` and did 
that early on in ```PythonRDD``` methods.

Finally, used ```RDD.first()``` to check for Pickle-ability of the keys and 
values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-07 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13518669
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * A trait for use with reading custom classes in PySpark. Implement this 
trait and add custom
+ * transformation code by overriding the convert method.
+ */
+@Experimental
+trait Converter[T, U] {
+  def convert(obj: T): U
+}
+
+/**
+ * A converter that handles conversion of common 
[[org.apache.hadoop.io.Writable]] objects.
+ * Other objects are passed through without conversion.
+ */
+private[python] object DefaultConverter extends Converter[Any, Any] {
+
+  /**
+   * Converts a [[org.apache.hadoop.io.Writable]] to the underlying 
primitive, String or
+   * object representation
+   */
+  private def convertWritable(writable: Writable): Any = {
+import collection.JavaConversions._
+writable match {
+  case iw: IntWritable => 
SparkContext.intWritableConverter().convert(iw)
+  case dw: DoubleWritable => 
SparkContext.doubleWritableConverter().convert(dw)
+  case lw: LongWritable => 
SparkContext.longWritableConverter().convert(lw)
+  case fw: FloatWritable => 
SparkContext.floatWritableConverter().convert(fw)
+  case t: Text => SparkContext.stringWritableConverter().convert(t)
+  case bw: BooleanWritable => 
SparkContext.booleanWritableConverter().convert(bw)
+  case byw: BytesWritable => 
SparkContext.bytesWritableConverter().convert(byw)
+  case n: NullWritable => null
+  case aw: ArrayWritable => aw.get().map(convertWritable(_))
+  case mw: MapWritable => mapAsJavaMap(mw.map{ case (k, v) =>
+(convertWritable(k), convertWritable(v))
+  }.toMap)
+  case other => other
+}
+  }
+
+  def convert(obj: Any): Any = {
+obj match {
+  case writable: Writable =>
+convertWritable(writable)
+  case _ =>
+obj
+}
+  }
+}
+
+/**
+ * The converter registry holds a key and value converter, so that they 
are only instantiated
+ * once per RDD partition.
+ */
+private[python] class ConverterRegistry extends Logging {
+
+  var keyConverter: Converter[Any, Any] = DefaultConverter
+  var valueConverter: Converter[Any, Any] = DefaultConverter
+
+  def convertKey(obj: Any): Any = keyConverter.convert(obj)
+
+  def convertValue(obj: Any): Any = valueConverter.convert(obj)
+
+  def registerKeyConverter(converterClass: String) = {
+keyConverter = register(converterClass)
+logInfo(s"Loaded and registered key converter ($converterClass)")
+  }
+
+  def registerValueConverter(converterClass: String) = {
+valueConverter = register(converterClass)
+logInfo(s"Loaded and registered value converter ($converterClass)")
+  }
+
+  private def register(converterClass: String): Converter[Any, Any] = {
+Try {
+  val converter = 
Class.forName(converterClass).newInstance().asInstanceOf[Converter[Any, Any]]
+  converter
+} match {
+  case Success(s) => s
+  case Failure(err) =>
+logError(s"Failed to register converter: $converterClass")
+throw err
+}
+  }
+}
+
+/** Utilities for working with Python objects -> Hadoop-related objects */
+private[python] object PythonHadoopUtil {
+
+  /**
+   * Convert a [[java.util.Map]] of properties to a 
[[org.apache.hadoop.conf.Configur

[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13514748
  
--- Diff: core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala 
---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import scala.util.Try
+import org.apache.spark.rdd.RDD
+import org.apache.spark.Logging
+import scala.util.Success
+import scala.util.Failure
+import net.razorvine.pickle.Pickler
+
+
+/** Utilities for serialization / deserialization between Python and Java, 
using Pickle. */
+private[python] object SerDeUtil extends Logging {
+
+  /**
+   * Convert an RDD of key-value pairs to an RDD of serialized Python 
objects, that is usable
+   * by PySpark. By default, if serialization fails, toString is called 
and the string
+   * representation is serialized
+   */
+  def rddToPython[K, V](rdd: RDD[(K, V)]): RDD[Array[Byte]] = {
+rdd.mapPartitions { iter =>
--- End diff --

Would it be possible to, instead of trying to pickle the first object of 
each partition, instead just try to take the first element in the RDD and 
pickle it's key and value. This would be cleaner and also avoid some 
potentially weird failure conditions if, e.g. for some reason we make different 
decisions for different partitions. If you check out the SQLContext this is 
exactly what it does:


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L299


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13514379
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * A trait for use with reading custom classes in PySpark. Implement this 
trait and add custom
+ * transformation code by overriding the convert method.
+ */
+@Experimental
+trait Converter[T, U] {
+  def convert(obj: T): U
+}
+
+/**
+ * A converter that handles conversion of common 
[[org.apache.hadoop.io.Writable]] objects.
+ * Other objects are passed through without conversion.
+ */
+private[python] object DefaultConverter extends Converter[Any, Any] {
+
+  /**
+   * Converts a [[org.apache.hadoop.io.Writable]] to the underlying 
primitive, String or
+   * object representation
+   */
+  private def convertWritable(writable: Writable): Any = {
+import collection.JavaConversions._
+writable match {
+  case iw: IntWritable => 
SparkContext.intWritableConverter().convert(iw)
+  case dw: DoubleWritable => 
SparkContext.doubleWritableConverter().convert(dw)
+  case lw: LongWritable => 
SparkContext.longWritableConverter().convert(lw)
+  case fw: FloatWritable => 
SparkContext.floatWritableConverter().convert(fw)
+  case t: Text => SparkContext.stringWritableConverter().convert(t)
+  case bw: BooleanWritable => 
SparkContext.booleanWritableConverter().convert(bw)
+  case byw: BytesWritable => 
SparkContext.bytesWritableConverter().convert(byw)
+  case n: NullWritable => null
+  case aw: ArrayWritable => aw.get().map(convertWritable(_))
+  case mw: MapWritable => mapAsJavaMap(mw.map{ case (k, v) =>
+(convertWritable(k), convertWritable(v))
+  }.toMap)
+  case other => other
+}
+  }
+
+  def convert(obj: Any): Any = {
+obj match {
+  case writable: Writable =>
+convertWritable(writable)
+  case _ =>
+obj
+}
+  }
+}
+
+/**
+ * The converter registry holds a key and value converter, so that they 
are only instantiated
+ * once per RDD partition.
+ */
+private[python] class ConverterRegistry extends Logging {
+
+  var keyConverter: Converter[Any, Any] = DefaultConverter
+  var valueConverter: Converter[Any, Any] = DefaultConverter
+
+  def convertKey(obj: Any): Any = keyConverter.convert(obj)
+
+  def convertValue(obj: Any): Any = valueConverter.convert(obj)
+
+  def registerKeyConverter(converterClass: String) = {
+keyConverter = register(converterClass)
+logInfo(s"Loaded and registered key converter ($converterClass)")
+  }
+
+  def registerValueConverter(converterClass: String) = {
+valueConverter = register(converterClass)
+logInfo(s"Loaded and registered value converter ($converterClass)")
+  }
+
+  private def register(converterClass: String): Converter[Any, Any] = {
+Try {
+  val converter = 
Class.forName(converterClass).newInstance().asInstanceOf[Converter[Any, Any]]
+  converter
+} match {
+  case Success(s) => s
+  case Failure(err) =>
+logError(s"Failed to register converter: $converterClass")
+throw err
+}
+  }
+}
+
+/** Utilities for working with Python objects -> Hadoop-related objects */
+private[python] object PythonHadoopUtil {
+
+  /**
+   * Convert a [[java.util.Map]] of properties to a 
[[org.apache.hadoop.conf.Config

[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13514061
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * A trait for use with reading custom classes in PySpark. Implement this 
trait and add custom
+ * transformation code by overriding the convert method.
+ */
+@Experimental
+trait Converter[T, U] {
+  def convert(obj: T): U
+}
+
+/**
+ * A converter that handles conversion of common 
[[org.apache.hadoop.io.Writable]] objects.
+ * Other objects are passed through without conversion.
+ */
+private[python] object DefaultConverter extends Converter[Any, Any] {
+
+  /**
+   * Converts a [[org.apache.hadoop.io.Writable]] to the underlying 
primitive, String or
+   * object representation
+   */
+  private def convertWritable(writable: Writable): Any = {
+import collection.JavaConversions._
+writable match {
+  case iw: IntWritable => 
SparkContext.intWritableConverter().convert(iw)
+  case dw: DoubleWritable => 
SparkContext.doubleWritableConverter().convert(dw)
+  case lw: LongWritable => 
SparkContext.longWritableConverter().convert(lw)
+  case fw: FloatWritable => 
SparkContext.floatWritableConverter().convert(fw)
+  case t: Text => SparkContext.stringWritableConverter().convert(t)
+  case bw: BooleanWritable => 
SparkContext.booleanWritableConverter().convert(bw)
+  case byw: BytesWritable => 
SparkContext.bytesWritableConverter().convert(byw)
+  case n: NullWritable => null
+  case aw: ArrayWritable => aw.get().map(convertWritable(_))
+  case mw: MapWritable => mapAsJavaMap(mw.map{ case (k, v) =>
+(convertWritable(k), convertWritable(v))
+  }.toMap)
+  case other => other
+}
+  }
+
+  def convert(obj: Any): Any = {
+obj match {
+  case writable: Writable =>
+convertWritable(writable)
+  case _ =>
+obj
+}
+  }
+}
+
+/**
+ * The converter registry holds a key and value converter, so that they 
are only instantiated
+ * once per RDD partition.
+ */
+private[python] class ConverterRegistry extends Logging {
+
+  var keyConverter: Converter[Any, Any] = DefaultConverter
+  var valueConverter: Converter[Any, Any] = DefaultConverter
+
+  def convertKey(obj: Any): Any = keyConverter.convert(obj)
+
+  def convertValue(obj: Any): Any = valueConverter.convert(obj)
+
+  def registerKeyConverter(converterClass: String) = {
+keyConverter = register(converterClass)
+logInfo(s"Loaded and registered key converter ($converterClass)")
+  }
+
+  def registerValueConverter(converterClass: String) = {
+valueConverter = register(converterClass)
+logInfo(s"Loaded and registered value converter ($converterClass)")
+  }
+
+  private def register(converterClass: String): Converter[Any, Any] = {
+Try {
+  val converter = 
Class.forName(converterClass).newInstance().asInstanceOf[Converter[Any, Any]]
+  converter
+} match {
+  case Success(s) => s
+  case Failure(err) =>
+logError(s"Failed to register converter: $converterClass")
+throw err
+}
+  }
+}
+
+/** Utilities for working with Python objects -> Hadoop-related objects */
+private[python] object PythonHadoopUtil {
+
+  /**
+   * Convert a [[java.util.Map]] of properties to a 
[[org.apache.hadoop.conf.Config

[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13513826
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * A trait for use with reading custom classes in PySpark. Implement this 
trait and add custom
+ * transformation code by overriding the convert method.
+ */
+@Experimental
+trait Converter[T, U] {
+  def convert(obj: T): U
+}
+
+/**
+ * A converter that handles conversion of common 
[[org.apache.hadoop.io.Writable]] objects.
+ * Other objects are passed through without conversion.
+ */
+private[python] object DefaultConverter extends Converter[Any, Any] {
+
+  /**
+   * Converts a [[org.apache.hadoop.io.Writable]] to the underlying 
primitive, String or
+   * object representation
+   */
+  private def convertWritable(writable: Writable): Any = {
+import collection.JavaConversions._
+writable match {
+  case iw: IntWritable => 
SparkContext.intWritableConverter().convert(iw)
+  case dw: DoubleWritable => 
SparkContext.doubleWritableConverter().convert(dw)
+  case lw: LongWritable => 
SparkContext.longWritableConverter().convert(lw)
+  case fw: FloatWritable => 
SparkContext.floatWritableConverter().convert(fw)
+  case t: Text => SparkContext.stringWritableConverter().convert(t)
+  case bw: BooleanWritable => 
SparkContext.booleanWritableConverter().convert(bw)
+  case byw: BytesWritable => 
SparkContext.bytesWritableConverter().convert(byw)
+  case n: NullWritable => null
+  case aw: ArrayWritable => aw.get().map(convertWritable(_))
+  case mw: MapWritable => mapAsJavaMap(mw.map{ case (k, v) =>
+(convertWritable(k), convertWritable(v))
+  }.toMap)
+  case other => other
+}
+  }
+
+  def convert(obj: Any): Any = {
+obj match {
+  case writable: Writable =>
+convertWritable(writable)
+  case _ =>
+obj
+}
+  }
+}
+
+/**
+ * The converter registry holds a key and value converter, so that they 
are only instantiated
+ * once per RDD partition.
+ */
+private[python] class ConverterRegistry extends Logging {
+
+  var keyConverter: Converter[Any, Any] = DefaultConverter
+  var valueConverter: Converter[Any, Any] = DefaultConverter
+
+  def convertKey(obj: Any): Any = keyConverter.convert(obj)
+
+  def convertValue(obj: Any): Any = valueConverter.convert(obj)
+
+  def registerKeyConverter(converterClass: String) = {
+keyConverter = register(converterClass)
+logInfo(s"Loaded and registered key converter ($converterClass)")
+  }
+
+  def registerValueConverter(converterClass: String) = {
+valueConverter = register(converterClass)
+logInfo(s"Loaded and registered value converter ($converterClass)")
+  }
+
+  private def register(converterClass: String): Converter[Any, Any] = {
+Try {
+  val converter = 
Class.forName(converterClass).newInstance().asInstanceOf[Converter[Any, Any]]
+  converter
+} match {
+  case Success(s) => s
+  case Failure(err) =>
+logError(s"Failed to register converter: $converterClass")
+throw err
+}
+  }
+}
+
+/** Utilities for working with Python objects -> Hadoop-related objects */
+private[python] object PythonHadoopUtil {
+
+  /**
+   * Convert a [[java.util.Map]] of properties to a 
[[org.apache.hadoop.conf.Config

[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13513810
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * A trait for use with reading custom classes in PySpark. Implement this 
trait and add custom
+ * transformation code by overriding the convert method.
+ */
+@Experimental
+trait Converter[T, U] {
+  def convert(obj: T): U
+}
+
+/**
+ * A converter that handles conversion of common 
[[org.apache.hadoop.io.Writable]] objects.
+ * Other objects are passed through without conversion.
+ */
+private[python] object DefaultConverter extends Converter[Any, Any] {
+
+  /**
+   * Converts a [[org.apache.hadoop.io.Writable]] to the underlying 
primitive, String or
+   * object representation
+   */
+  private def convertWritable(writable: Writable): Any = {
+import collection.JavaConversions._
+writable match {
+  case iw: IntWritable => 
SparkContext.intWritableConverter().convert(iw)
+  case dw: DoubleWritable => 
SparkContext.doubleWritableConverter().convert(dw)
+  case lw: LongWritable => 
SparkContext.longWritableConverter().convert(lw)
+  case fw: FloatWritable => 
SparkContext.floatWritableConverter().convert(fw)
+  case t: Text => SparkContext.stringWritableConverter().convert(t)
+  case bw: BooleanWritable => 
SparkContext.booleanWritableConverter().convert(bw)
+  case byw: BytesWritable => 
SparkContext.bytesWritableConverter().convert(byw)
+  case n: NullWritable => null
+  case aw: ArrayWritable => aw.get().map(convertWritable(_))
+  case mw: MapWritable => mapAsJavaMap(mw.map{ case (k, v) =>
+(convertWritable(k), convertWritable(v))
+  }.toMap)
+  case other => other
+}
+  }
+
+  def convert(obj: Any): Any = {
+obj match {
+  case writable: Writable =>
+convertWritable(writable)
+  case _ =>
+obj
+}
+  }
+}
+
+/**
+ * The converter registry holds a key and value converter, so that they 
are only instantiated
+ * once per RDD partition.
+ */
+private[python] class ConverterRegistry extends Logging {
+
+  var keyConverter: Converter[Any, Any] = DefaultConverter
+  var valueConverter: Converter[Any, Any] = DefaultConverter
+
+  def convertKey(obj: Any): Any = keyConverter.convert(obj)
+
+  def convertValue(obj: Any): Any = valueConverter.convert(obj)
+
+  def registerKeyConverter(converterClass: String) = {
+keyConverter = register(converterClass)
+logInfo(s"Loaded and registered key converter ($converterClass)")
+  }
+
+  def registerValueConverter(converterClass: String) = {
+valueConverter = register(converterClass)
+logInfo(s"Loaded and registered value converter ($converterClass)")
+  }
+
+  private def register(converterClass: String): Converter[Any, Any] = {
+Try {
+  val converter = 
Class.forName(converterClass).newInstance().asInstanceOf[Converter[Any, Any]]
+  converter
+} match {
+  case Success(s) => s
+  case Failure(err) =>
+logError(s"Failed to register converter: $converterClass")
+throw err
+}
+  }
+}
+
+/** Utilities for working with Python objects -> Hadoop-related objects */
+private[python] object PythonHadoopUtil {
+
+  /**
+   * Convert a [[java.util.Map]] of properties to a 
[[org.apache.hadoop.conf.Config

[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13513759
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * A trait for use with reading custom classes in PySpark. Implement this 
trait and add custom
+ * transformation code by overriding the convert method.
+ */
+@Experimental
+trait Converter[T, U] {
+  def convert(obj: T): U
+}
+
+/**
+ * A converter that handles conversion of common 
[[org.apache.hadoop.io.Writable]] objects.
+ * Other objects are passed through without conversion.
+ */
+private[python] object DefaultConverter extends Converter[Any, Any] {
+
+  /**
+   * Converts a [[org.apache.hadoop.io.Writable]] to the underlying 
primitive, String or
+   * object representation
+   */
+  private def convertWritable(writable: Writable): Any = {
+import collection.JavaConversions._
+writable match {
+  case iw: IntWritable => 
SparkContext.intWritableConverter().convert(iw)
+  case dw: DoubleWritable => 
SparkContext.doubleWritableConverter().convert(dw)
+  case lw: LongWritable => 
SparkContext.longWritableConverter().convert(lw)
+  case fw: FloatWritable => 
SparkContext.floatWritableConverter().convert(fw)
+  case t: Text => SparkContext.stringWritableConverter().convert(t)
+  case bw: BooleanWritable => 
SparkContext.booleanWritableConverter().convert(bw)
+  case byw: BytesWritable => 
SparkContext.bytesWritableConverter().convert(byw)
+  case n: NullWritable => null
+  case aw: ArrayWritable => aw.get().map(convertWritable(_))
+  case mw: MapWritable => mapAsJavaMap(mw.map{ case (k, v) =>
+(convertWritable(k), convertWritable(v))
+  }.toMap)
+  case other => other
+}
+  }
+
+  def convert(obj: Any): Any = {
+obj match {
+  case writable: Writable =>
+convertWritable(writable)
+  case _ =>
+obj
+}
+  }
+}
+
+/**
+ * The converter registry holds a key and value converter, so that they 
are only instantiated
+ * once per RDD partition.
+ */
+private[python] class ConverterRegistry extends Logging {
--- End diff --

Could this code be simplified by removing this class? Since this is already 
using a `mapPartitions` block there will only be one instance of each converter 
created for each partition anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45365824
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15511/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45365821
  
Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45361465
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45357583
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45344851
  
Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45344852
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15508/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45344727
  
Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45320861
  
 Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-05 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13472864
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+
+
+trait Converter {
--- End diff --

That seems okay; are you worried that you have to cast it? You can cast any 
Converter to `Converter[Any, Any]` safely due to type erasure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-05 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13428056
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+
+
+trait Converter {
--- End diff --

Making this change I end up with:

```scala
private[python] class ConverterRegistry extends Logging {

  var keyConverter: Converter[Any, Any] = DefaultConverter
  var valueConverter: Converter[Any, Any] = DefaultConverter

...

  private def register(converterClass: String): Converter[Any, Any] = {
Try {
  val converter = 
Class.forName(converterClass).newInstance().asInstanceOf[Converter[Any, Any]]
  converter
} match {
  case Success(s) => s
  case Failure(err) =>
logError(s"Failed to register converter: $converterClass")
throw err
}
  }
```

Is this the best approach or is there another better way?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13426219
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala ---
@@ -32,6 +32,23 @@ import org.apache.hadoop.mapreduce.Job
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.SparkContext._
+import org.apache.spark.api.python.Converter
+
+class CassandraCQLKeyConverter extends Converter {
+  import collection.JavaConversions._
+  override def convert(obj: Any) = {
+val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
+mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
+  }
+}
+
+class CassandraCQLValueConverter extends Converter {
+  import collection.JavaConversions._
+  override def convert(obj: Any) = {
+val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
+mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
+  }
+}
--- End diff --

will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13426207
  
--- Diff: docs/programming-guide.md ---
@@ -378,9 +378,82 @@ Some notes on reading files with Spark:
 
 * The `textFile` method also takes an optional second argument for 
controlling the number of slices of the file. By default, Spark creates one 
slice for each block of the file (blocks being 64MB by default in HDFS), but 
you can also ask for a higher number of slices by passing a larger value. Note 
that you cannot have fewer slices than blocks.
 
-Apart reading files as a collection of lines,
+Apart from reading files as a collection of lines,
 `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file.
 
+## SequenceFile and Hadoop InputFormats
+
+In addition to reading text files, PySpark supports reading 
[SequenceFile](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html)
 
+and any arbitrary 
[InputFormat](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/InputFormat.html).
+
+### Writable Support
+
+PySpark SequenceFile support loads an RDD within Java, and pickles the 
resulting Java objects using
+[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables 
are automatically converted:
+
+
+Writable TypeScala TypePython Type
+TextStringunicode str
+IntWritableIntint
+FloatWritableFloatfloat
+DoubleWritableDoublefloat
+BooleanWritableBooleanbool
+BytesWritableArray[Byte]bytearray
+NullWritablenullNone
+ArrayWritableArray[T]list of primitives, or 
tuple of objects
+MapWritablejava.util.Map[K, V]dict
+Custom ClassCustom Class conforming to Java Bean 
conventions
+dict of public properties (via JavaBean getters and setters) + 
__class__ for the class type
+
--- End diff --

Good point, can make it more concise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13426202
  
--- Diff: docs/programming-guide.md ---
@@ -378,9 +378,82 @@ Some notes on reading files with Spark:
 
 * The `textFile` method also takes an optional second argument for 
controlling the number of slices of the file. By default, Spark creates one 
slice for each block of the file (blocks being 64MB by default in HDFS), but 
you can also ask for a higher number of slices by passing a larger value. Note 
that you cannot have fewer slices than blocks.
 
-Apart reading files as a collection of lines,
+Apart from reading files as a collection of lines,
 `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file.
 
+## SequenceFile and Hadoop InputFormats
+
+In addition to reading text files, PySpark supports reading 
[SequenceFile](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html)
 
+and any arbitrary 
[InputFormat](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/InputFormat.html).
+
+### Writable Support
+
+PySpark SequenceFile support loads an RDD within Java, and pickles the 
resulting Java objects using
+[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables 
are automatically converted:
+
+
+Writable TypeScala TypePython Type
+TextStringunicode str
+IntWritableIntint
+FloatWritableFloatfloat
+DoubleWritableDoublefloat
+BooleanWritableBooleanbool
+BytesWritableArray[Byte]bytearray
+NullWritablenullNone
+ArrayWritableArray[T]list of primitives, or 
tuple of objects
+MapWritablejava.util.Map[K, V]dict
+Custom ClassCustom Class conforming to Java Bean 
conventions
+dict of public properties (via JavaBean getters and setters) + 
__class__ for the class type
+
+
+### Loading SequenceFiles
+
+Similarly to text files, SequenceFiles can be loaded by specifying the 
path. The key and value
+classes can be specified, but for standard Writables it should work 
without requiring this.
+
+{% highlight python %}
+>>> rdd = sc.sequenceFile("path/to/sequencefile/of/doubles")
+>>> rdd.collect() # this example has DoubleWritable keys and Text 
values
+[(1.0, u'aa'),
+ (2.0, u'bb'),
+ (2.0, u'aa'),
+ (3.0, u'cc'),
+ (2.0, u'bb'),
+ (1.0, u'aa')]
+>>> help(sc.sequenceFile) # Show sequencefile documentation
+{% endhighlight %}
+
+### Loading Arbitrary Hadoop InputFormats
+
+PySpark can also read any Hadoop InputFormat, for both 'new' and 'old' 
Hadoop APIs. If required,
+a Hadoop configuration can be passed in as a Python dict. Here is an 
example using the
+Elasticsearch ESInputFormat:
+
+{% highlight python %}
+$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
+>>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is 
running on localhost defaults
+>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
+"org.apache.hadoop.io.NullWritable", 
"org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
+>>> rdd.first() # the result is a MapWritable that is converted to 
a Python dict
+(u'Elasticsearch ID',
+ {u'field1': True,
+  u'field2': u'Some Text',
+  u'field3': 12345})
+>>> help(sc.newAPIHadoopRDD) # Show help for new API Hadoop RDD
+{% endhighlight %}
+
+Note that, if the InputFormat simply depends on a Hadoop configuration 
and/or input path, and
+the key and value classes can easily be converted according to the above 
table,
+then this approach should work well for such cases.
+
+If you have custom serialized binary data (like pulling data from 
Cassandra / HBase) or custom
+classes that don't conform to the JavaBean requirements, then you will 
probably have to first
+transform that data on the Scala/Java side to something which can be 
handled by Pyrolite's pickler.
+
+Future support for custom 'converter' functions for keys/values that 
allows this to be written in Java/Scala,
+and called from Python, as well as support for writing data out as 
SequenceFileOutputFormat
+and other OutputFormats, is forthcoming.
--- End diff --

Ah sorry missed this. Will amend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45160638
  
CC @ahirreddy, @joshrosen, @holdenk for comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13415318
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+
+
+trait Converter {
--- End diff --

Also, add a ScalaDoc to this and add `:: Experimental ::` at the beginning 
of the doc and the annotation `@ExperimentalAPI`. Then it will be marked 
experimental in the docs, similar to 
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.FutureAction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13415338
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala ---
@@ -32,6 +32,23 @@ import org.apache.hadoop.mapreduce.Job
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.SparkContext._
+import org.apache.spark.api.python.Converter
+
+class CassandraCQLKeyConverter extends Converter {
+  import collection.JavaConversions._
--- End diff --

Could probably change this to only import `mapAsJavaMap`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45160411
  
Hey Nick, this looks really great! Thanks for taking the time to write 
examples too. I made some comments on the Converter API and docs but I think it 
will be more or less ready to go soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13415225
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+
+
+trait Converter {
+  def convert(obj: Any): Any
+}
+
+object DefaultConverter extends Converter {
--- End diff --

Should be `private[python]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13415213
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+
+
+trait Converter {
--- End diff --

IMO this should be parametrized as `trait Converter[T, U]` and have `def 
convert(obj: T): U`. This will make it easier to write subclasses in Java or 
Scala and it won't affect anything below because of type erasure (in the worst 
case it means a bit more ugly code to pass this stuff through, e.g. casting the 
converter to a `Converter[Any, Any]`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13415074
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io._
+import scala.util.{Failure, Success, Try}
+
+
+trait Converter {
+  def convert(obj: Any): Any
+}
+
+object DefaultConverter extends Converter {
+
+  /**
+   * Converts a [[org.apache.hadoop.io.Writable]] to the underlying 
primitive, String or
+   * object representation
+   */
+  private def convertWritable(writable: Writable): Any = {
+import collection.JavaConversions._
+writable match {
+  case iw: IntWritable => 
SparkContext.intWritableConverter().convert(iw)
+  case dw: DoubleWritable => 
SparkContext.doubleWritableConverter().convert(dw)
+  case lw: LongWritable => 
SparkContext.longWritableConverter().convert(lw)
+  case fw: FloatWritable => 
SparkContext.floatWritableConverter().convert(fw)
+  case t: Text => SparkContext.stringWritableConverter().convert(t)
+  case bw: BooleanWritable => 
SparkContext.booleanWritableConverter().convert(bw)
+  case byw: BytesWritable => 
SparkContext.bytesWritableConverter().convert(byw)
+  case n: NullWritable => null
+  case aw: ArrayWritable => aw.get().map(convertWritable(_))
+  case mw: MapWritable => mapAsJavaMap(mw.map{ case (k, v) =>
+(convertWritable(k), convertWritable(v))
+  }.toMap)
+  case other => other
+}
+  }
+
+  def convert(obj: Any): Any = {
+obj match {
+  case writable: Writable =>
+convertWritable(writable)
+  case _ =>
+obj
+}
+  }
+}
+
+class ConverterRegistry extends Logging {
--- End diff --

This should probably be `private[spark]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13415006
  
--- Diff: examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala 
---
@@ -17,12 +17,21 @@
 
 package org.apache.spark.examples
 
-import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.hadoop.hbase.client.{Result, HBaseAdmin}
 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 
 import org.apache.spark._
 import org.apache.spark.rdd.NewHadoopRDD
+import org.apache.spark.api.python.Converter
+import org.apache.hadoop.hbase.util.Bytes
+
+class HBaseConverter extends Converter {
+  override def convert(obj: Any) = {
+val result = obj.asInstanceOf[Result]
+Bytes.toStringBinary(result.value())
+  }
+}
--- End diff --

Ditto on moving this to a separate file


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13414996
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala ---
@@ -32,6 +32,23 @@ import org.apache.hadoop.mapreduce.Job
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.SparkContext._
+import org.apache.spark.api.python.Converter
+
+class CassandraCQLKeyConverter extends Converter {
+  import collection.JavaConversions._
+  override def convert(obj: Any) = {
+val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
+mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
+  }
+}
+
+class CassandraCQLValueConverter extends Converter {
+  import collection.JavaConversions._
+  override def convert(obj: Any) = {
+val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
+mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
+  }
+}
--- End diff --

Move these to separate source files since they're not used in the Scala 
example. Maybe we can even have a "pythonConverters" subpackage of 
org.apache.spark.examples that says these are used in the .py examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13414912
  
--- Diff: docs/programming-guide.md ---
@@ -378,9 +378,82 @@ Some notes on reading files with Spark:
 
 * The `textFile` method also takes an optional second argument for 
controlling the number of slices of the file. By default, Spark creates one 
slice for each block of the file (blocks being 64MB by default in HDFS), but 
you can also ask for a higher number of slices by passing a larger value. Note 
that you cannot have fewer slices than blocks.
 
-Apart reading files as a collection of lines,
+Apart from reading files as a collection of lines,
 `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file.
 
+## SequenceFile and Hadoop InputFormats
+
+In addition to reading text files, PySpark supports reading 
[SequenceFile](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html)
 
+and any arbitrary 
[InputFormat](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/InputFormat.html).
+
+### Writable Support
+
+PySpark SequenceFile support loads an RDD within Java, and pickles the 
resulting Java objects using
+[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables 
are automatically converted:
+
+
+Writable TypeScala TypePython Type
+TextStringunicode str
+IntWritableIntint
+FloatWritableFloatfloat
+DoubleWritableDoublefloat
+BooleanWritableBooleanbool
+BytesWritableArray[Byte]bytearray
+NullWritablenullNone
+ArrayWritableArray[T]list of primitives, or 
tuple of objects
+MapWritablejava.util.Map[K, V]dict
+Custom ClassCustom Class conforming to Java Bean 
conventions
+dict of public properties (via JavaBean getters and setters) + 
__class__ for the class type
+
+
+### Loading SequenceFiles
+
+Similarly to text files, SequenceFiles can be loaded by specifying the 
path. The key and value
+classes can be specified, but for standard Writables it should work 
without requiring this.
+
+{% highlight python %}
+>>> rdd = sc.sequenceFile("path/to/sequencefile/of/doubles")
+>>> rdd.collect() # this example has DoubleWritable keys and Text 
values
+[(1.0, u'aa'),
+ (2.0, u'bb'),
+ (2.0, u'aa'),
+ (3.0, u'cc'),
+ (2.0, u'bb'),
+ (1.0, u'aa')]
+>>> help(sc.sequenceFile) # Show sequencefile documentation
+{% endhighlight %}
+
+### Loading Arbitrary Hadoop InputFormats
+
+PySpark can also read any Hadoop InputFormat, for both 'new' and 'old' 
Hadoop APIs. If required,
+a Hadoop configuration can be passed in as a Python dict. Here is an 
example using the
+Elasticsearch ESInputFormat:
+
+{% highlight python %}
+$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
+>>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is 
running on localhost defaults
+>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
+"org.apache.hadoop.io.NullWritable", 
"org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
+>>> rdd.first() # the result is a MapWritable that is converted to 
a Python dict
+(u'Elasticsearch ID',
+ {u'field1': True,
+  u'field2': u'Some Text',
+  u'field3': 12345})
+>>> help(sc.newAPIHadoopRDD) # Show help for new API Hadoop RDD
+{% endhighlight %}
+
+Note that, if the InputFormat simply depends on a Hadoop configuration 
and/or input path, and
+the key and value classes can easily be converted according to the above 
table,
+then this approach should work well for such cases.
+
+If you have custom serialized binary data (like pulling data from 
Cassandra / HBase) or custom
+classes that don't conform to the JavaBean requirements, then you will 
probably have to first
+transform that data on the Scala/Java side to something which can be 
handled by Pyrolite's pickler.
+
+Future support for custom 'converter' functions for keys/values that 
allows this to be written in Java/Scala,
+and called from Python, as well as support for writing data out as 
SequenceFileOutputFormat
+and other OutputFormats, is forthcoming.
--- End diff --

Is this still true or can we link to the Converter interface here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13414845
  
--- Diff: docs/programming-guide.md ---
@@ -378,9 +378,82 @@ Some notes on reading files with Spark:
 
 * The `textFile` method also takes an optional second argument for 
controlling the number of slices of the file. By default, Spark creates one 
slice for each block of the file (blocks being 64MB by default in HDFS), but 
you can also ask for a higher number of slices by passing a larger value. Note 
that you cannot have fewer slices than blocks.
 
-Apart reading files as a collection of lines,
+Apart from reading files as a collection of lines,
 `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file.
 
+## SequenceFile and Hadoop InputFormats
+
+In addition to reading text files, PySpark supports reading 
[SequenceFile](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html)
 
+and any arbitrary 
[InputFormat](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/InputFormat.html).
+
+### Writable Support
+
+PySpark SequenceFile support loads an RDD within Java, and pickles the 
resulting Java objects using
+[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables 
are automatically converted:
+
+
+Writable TypeScala TypePython Type
+TextStringunicode str
+IntWritableIntint
+FloatWritableFloatfloat
+DoubleWritableDoublefloat
+BooleanWritableBooleanbool
+BytesWritableArray[Byte]bytearray
+NullWritablenullNone
+ArrayWritableArray[T]list of primitives, or 
tuple of objects
+MapWritablejava.util.Map[K, V]dict
+Custom ClassCustom Class conforming to Java Bean 
conventions
+dict of public properties (via JavaBean getters and setters) + 
__class__ for the class type
+
--- End diff --

Does this table need a "Scala type" column? It seems like for anything 
except the last one, you can add just the Python type, and for the "custom 
class" you can make the left column be "custom class conforming to Java Bean 
conventions"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/455#discussion_r13414789
  
--- Diff: docs/programming-guide.md ---
@@ -378,9 +378,82 @@ Some notes on reading files with Spark:
 
 * The `textFile` method also takes an optional second argument for 
controlling the number of slices of the file. By default, Spark creates one 
slice for each block of the file (blocks being 64MB by default in HDFS), but 
you can also ask for a higher number of slices by passing a larger value. Note 
that you cannot have fewer slices than blocks.
 
-Apart reading files as a collection of lines,
+Apart from reading files as a collection of lines,
 `SparkContext.wholeTextFiles` lets you read a directory containing 
multiple small text files, and returns each of them as (filename, content) 
pairs. This is in contrast with `textFile`, which would return one record per 
line in each file.
 
+## SequenceFile and Hadoop InputFormats
--- End diff --

You probably need to use `###` for subheadings since the one above was 
already `##`; see how it looks in the doc once built (you can do `SKIP_API=1 
jekyll serve --watch` in `docs/` and then go to http://localhost:4000).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45107235
  
All automated tests passed.
Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15448/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45107234
  
Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1416: PySpark support for SequenceFile a...

2014-06-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/455#issuecomment-45101262
  

Refer to this link for build results: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15447/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >