RE: Strongly Connected Components

2016-11-11 Thread Shreya Agarwal
Thanks for the detailed response ☺ I will try the things you mentioned!

From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]
Sent: Friday, November 11, 2016 4:59 PM
To: Shreya Agarwal 
Cc: Felix Cheung ; user@spark.apache.org; Denny Lee 

Subject: Re: Strongly Connected Components

Hi Shreya,
GraphFrames just calls the GraphX strongly connected components code. 
(https://github.com/graphframes/graphframes/blob/release-0.2.0/src/main/scala/org/graphframes/lib/StronglyConnectedComponents.scala#L51)

For choosing the number of iterations: If the number of iterations is less than 
the diameter of the graph, you may get an incorrect result. But running for 
more iterations than that buys you nothing. The algorithm is basically to 
broadcast your ID to all your neighbors in the first round, and then broadcast 
the smallest ID that you have seen so far in the next rounds. So with only 1 
round you will get a wrong result unless each vertex is connected to the vertex 
with the lowest ID in that component. (Unlikely in a real graph.)

See 
https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
 for the actual implementation.

A better algorithm exists for this problem that only requires O(log(N)) 
iterations when N is the largest component diameter. (It is described in "A 
Model of Computation for MapReduce", 
http://www.sidsuri.com/Publications_files/mrc.pdf.) This outperforms GraphX's 
implementation immensely. (See the last slide of 
http://www.slideshare.net/SparkSummit/interactive-graph-analytics-daniel-darabos#33.)
 The large advantage is due to the lower number of necessary iterations.

For why this is failing even with one iteration: I would first check your 
partitioning. Too many or too few partitions could equally cause the issue. If 
you are lucky, there is no overlap between the "too many" and "too few" domains 
:).

On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal 
> wrote:
Tried GraphFrames. Still faced the same – job died after a few hours . The 
errors I see (And I see tons of them) are –
(I ran with 3 times the partitions as well, which was 12 times number of 
executors , but still the same.)

-
ERROR NativeAzureFileSystem: Encountered Storage Exception for write on Blob : 
hdp/spark2-events/application_1478717432179_0021.inprogress Exception details: 
null Error Code : RequestBodyTooLarge

-

16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests 
outstanding when connection from /10.0.0.95:43301 is 
closed
16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 
outstanding blocks after 5000 ms
16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 non-empty 
blocks out of 1500 blocks
16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block 
fetches
java.io.IOException: Connection from /10.0.0.95:43301 
closed

-

16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block 
fetches
java.lang.RuntimeException: java.io.FileNotFoundException: 
/mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/appcache/application_1478717432179_0021/blockmgr-b1dde30d-359e-4932-b7a4-a5e138a52360/37/shuffle_1346_21_0.index
 (No such file or directory)

-

org.apache.spark.SparkException: Exception thrown in awaitResult
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
at 

Re: DataSet is not able to handle 50,000 columns to sum

2016-11-11 Thread Anil Langote
All right thanks for inputs is there any way spark can process all combination 
parallel in one job ? 

If is it ok to load the input csv file in dataframe and use flat map to create 
key pair, then use reduceByKey to sum the double array? I believe that will 
work same like agg function which you are suggesting.

Best Regards,
Anil Langote
+1-425-633-9747

> On Nov 11, 2016, at 7:10 PM, ayan guha  wrote:
> 
> You can explore grouping sets in SQL and write an aggregate function to add 
> array wise sum.
> 
> It will boil down to something like
> 
> Select attr1,attr2...,yourAgg(Val)
> From t
> Group by attr1,attr2...
> Grouping sets((attr1,attr2),(aytr1))
> 
>> On 12 Nov 2016 04:57, "Anil Langote"  wrote:
>> Hi All,
>> 
>>  
>> 
>> I have been working on one use case and couldn’t able to think the better 
>> solution, I have seen you very active on spark user list please throw your 
>> thoughts on implementation. Below is the requirement.
>> 
>>  
>> 
>> I have tried using dataset by splitting the double array column but it fails 
>> when double size grows. When I create the double array schema data type 
>> spark doesn’t allow me to sum them because it would be done only on numeric 
>> types. If I think about storing the file per combination wise to parquet 
>> there will be too much parquet files.
>> 
>>  
>> 
>> Input :  The input file will be like below in real data the attributes will 
>> be 20 & the double array would be 50,000
>> 
>>  
>> 
>>  
>> 
>> Attribute_0
>> 
>> Attribute_1
>> 
>> Attribute_2
>> 
>> Attribute_3
>> 
>> DoubleArray
>> 
>> 5
>> 
>> 3
>> 
>> 5
>> 
>> 3
>> 
>> 0.2938933463658645  0.0437040427073041  0.23002681025029648  
>> 0.18003221216680454
>> 
>> 3
>> 
>> 2
>> 
>> 1
>> 
>> 3
>> 
>> 0.5353599620508771  0.026777650111232787  0.31473082754161674  
>> 0.2647786522276575
>> 
>> 5
>> 
>> 3
>> 
>> 5
>> 
>> 2
>> 
>> 0.8803063581705307  0.8101324740101096  0.48523937757683544  
>> 0.5897714618376072
>> 
>> 3
>> 
>> 2
>> 
>> 1
>> 
>> 3
>> 
>> 0.33960064683141955  0.46537001358164043  0.543428826489435  
>> 0.42653939565053034
>> 
>> 2
>> 
>> 2
>> 
>> 0
>> 
>> 5
>> 
>> 0.5108235777360906  0.4368119043922922  0.8651556676944931  
>> 0.7451477943975504
>> 
>>  
>> 
>> Now below are the possible combinations in above data set this will be all 
>> possible combinations
>> 
>>  
>> 
>> 1.  Attribute_0, Attribute_1
>> 
>> 2.  Attribute_0, Attribute_2
>> 
>> 3.  Attribute_0, Attribute_3
>> 
>> 4.  Attribute_1, Attribute_2
>> 
>> 5.  Attribute_2, Attribute_3
>> 
>> 6.  Attribute_1, Attribute_3
>> 
>> 7.  Attribute_0, Attribute_1, Attribute_2
>> 
>> 8.  Attribute_0, Attribute_1, Attribute_3
>> 
>> 9.  Attribute_0, Attribute_2, Attribute_3
>> 
>> 10.  Attribute_1, Attribute_2, Attribute_3
>> 
>> 11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4
>> 
>>  
>> 
>> Now we have to process all these combinations on input data preferably 
>> parallel to get good performance.
>> 
>>  
>> 
>> Attribute_0, Attribute_1
>> 
>>  
>> 
>> In this iteration the other attributes (Attribute_2, Attribute_3) are not 
>> required all we need is Attribute_0, Attribute_1 & double array columns. If 
>> you see the data there are two possible combination in the data one is 5_3 
>> and other one is 3_2 we have to pick only those which has at least 2 
>> combinations in real data we will get in thousands. 
>> 
>>  
>> 
>>  
>> 
>> Attribute_0
>> 
>> Attribute_1
>> 
>> Attribute_2
>> 
>> Attribute_3
>> 
>> DoubleArray
>> 
>> 5
>> 
>> 3
>> 
>> 5
>> 
>> 3
>> 
>> 0.2938933463658645  0.0437040427073041  0.23002681025029648  
>> 0.18003221216680454
>> 
>> 3
>> 
>> 2
>> 
>> 1
>> 
>> 3
>> 
>> 0.5353599620508771  0.026777650111232787  0.31473082754161674  
>> 0.2647786522276575
>> 
>> 5
>> 
>> 3
>> 
>> 5
>> 
>> 2
>> 
>> 0.8803063581705307  0.8101324740101096  0.48523937757683544  
>> 0.5897714618376072
>> 
>> 3
>> 
>> 2
>> 
>> 1
>> 
>> 3
>> 
>> 0.33960064683141955  0.46537001358164043  0.543428826489435  
>> 0.42653939565053034
>> 
>> 2
>> 
>> 2
>> 
>> 0
>> 
>> 5
>> 
>> 0.5108235777360906  0.4368119043922922  0.8651556676944931  
>> 0.7451477943975504
>> 
>>  
>> 
>> when we do the groupBy on above dataset with columns Attribute_0, 
>> Attribute_1 we will get two records with keys 5_3 & 3_2 and each key will 
>> have two double arrays.
>> 
>>  
>> 
>> 5_3 ==> 0.2938933463658645  0.0437040427073041  0.23002681025029648  
>> 0.18003221216680454 & 0.8803063581705307  0.8101324740101096  
>> 0.48523937757683544  0.5897714618376072
>> 
>>  
>> 
>> 3_2 ==> 0.5353599620508771  0.026777650111232787  0.31473082754161674  
>> 0.2647786522276575 & 0.33960064683141955  0.46537001358164043  
>> 0.543428826489435  0.42653939565053034
>> 
>>  
>> 
>> now we have to add these double arrays index wise and produce the one array
>> 
>>  
>> 
>> 5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319, 
>> 0.7698036740044117]
>> 
>> 3_2 ==> 

Exception not failing Python applications (in yarn client mode) - SparkLauncher says app succeeded, where app actually has failed

2016-11-11 Thread Elkhan Dadashov
Hi,

*Problem*:
Spark job fails, but RM page says the job succeeded, also

appHandle = sparkLauncher.startApplication()
...

appHandle.getState() returns Finished state - which indicates The
application finished with a successful status, whereas the Spark job
actually failed.

*Environment*: Macintosh (El Capitan), Hadoop 2.7.2, Spark 2.0,
SparkLauncher 2.0.1

I have Spark job (pagerank.py) running in yarn-client mode.

*Reason of job failure*: The job fails because dependency package
pagerank.zip is missing.

*Related Jira (which indicate that bug is fixed)*:
https://issues.apache.org/jira/browse/SPARK-7736 - this was in Yarn-cluster
mode, now i face this issue in yarn-client mode.
https://issues.apache.org/jira/browse/SPARK-9416 (duplicate)

I faced same issue last year with SparkLauncher (spark-launcher_2.11) 1.4.0
version, then Marcelo had pull request which fixed the issue, and it was
working at that time (after Marcelo's fix) for yarn-cluster mode.

*Description*:
I'm launching Spark job via SparkLauncher#startApplication(),
1) in the RM page, it says the job succeeded, even though the Spark job has
failed.
2) in the container logs, i see that appHandle.getState() returned Finished
state - which also means The application finished with a successful status.

But in the same map container log lines I see that *the job is actually
failed (*I launched Spark job from the map task*)*:

493 INFO: ImportError: ('No module named pagerank', , ('pagerank',))
557 INFO: ImportError: ('No module named pagerank', , ('pagerank',))
591 INFO: ImportError: ('No module named pagerank', , ('pagerank',))
655 INFO: ImportError: ('No module named pagerank', , ('pagerank',))
659 INFO: 16/11/11 18:25:37 ERROR TaskSetManager: Task 0 in stage 0.0
failed 4 times; aborting job
665 INFO: 16/11/11 18:25:37 INFO DAGScheduler: ShuffleMapStage 0 (distinct
at
/private/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/nm-local-dir/usercache//appcache/application_1478901028064_0016/container_1478901028064_0016_01_02/pag
erank.py:52) failed in 3.221 s
667 INFO: 16/11/11 18:25:37 INFO DAGScheduler: *Job 0 failed*: collect at
/private/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/nm-local-dir/usercache//appcache/application_1478901028064_0016/container_1478901028064_0016_01_02/pagerank.
py:68, took 3.303328 s
681 INFO: py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
683 INFO: : org.apache.spark.SparkException: *Job aborted due to stage
failure*: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost
task 0.3 in stage 0.0 (TID 3, ):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
705 INFO: ImportError: ('No module named pagerank', , ('pagerank',))
745 INFO: at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
757 INFO: at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
759 INFO: at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
763 INFO: at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
841 INFO: ImportError: ('No module named pagerank', , ('pagerank',))

887 INFO: Spark job with app id: application_1478901028064_0017, *State
changed to: FINISHED* - The application finished with a successful status.

And here are the log lines from the Spark job container:
16/11/11 18:25:37 ERROR Executor: Exception in task 0.2 in stage 0.0 (TID 2)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
File
"/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/nm-local-dir/usercache//appcache/application_1478901028064_0017/container_1478901028064_0017_01_02/pyspark.zip/pyspark/worker.py",
line 161, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File
"/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/nm-local-dir/usercache//appcache/application_1478901028064_0017/container_1478901028064_0017_01_02/pyspark.zip/pyspark/worker.py",
line 54, in read_command
command = serializer._read_with_length(file)
File
"/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/nm-local-dir/usercache//appcache/application_1478901028064_0017/container_1478901028064_0017_01_02/pyspark.zip/pyspark/serializers.py",
line 164, in _read_with_length
return self.loads(obj)
File
"/var/folders/9x/4m9lx2wj4qd8vwwq8n_qb2vx7mkj6g/T/hadoop/hadoop/nm-local-dir/usercache//appcache/application_1478901028064_0017/container_1478901028064_0017_01_02/pyspark.zip/pyspark/serializers.py",
line 422, in loads
return pickle.loads(obj)
File

SparkDriver memory calculation mismatch

2016-11-11 Thread Elkhan Dadashov
Hi,

Spark website 
indicates
default spark properties as like this:
I did not override any properties in spark-defaults.conf file, but when I
launch Spark in YarnClient mode:

spark.driver.memory 1g
spark.yarn.am.memory 512m
spark.yarn.am.memoryOverhead : max(spark.yarn.am.memory * 0.10, 384m)
spark.yarn.driver.memoryOverhead : max(spark.driver.memory * 0.10, 384m)

I launch Spark job via SparkLauncher#startApplication() in *Yarn-client
mode from the Map task of Hadoop job*.

*My cluster settings*:
yarn.scheduler.minimum-allocation-mb 256
yarn.scheduler.maximum-allocation-mb 2048
yarn.app.mapreduce.am.resource.mb 512
mapreduce.map.memory.mb 640
mapreduce.map.java.opts -Xmx400m
yarn.app.mapreduce.am.command-opts -Xmx448m

*Logs of Spark job*:

INFO Client: Verifying our application has not requested more than the
maximum memory capability of the cluster (2048 MB per container)
INFO Client: Will allocate *AM container*, with 896 MB memory including 384
MB overhead

INFO MemoryStore: MemoryStore started with capacity 366.3 MB

./application_1478727394310_0005/container_1478727394310_0005_01_02/stderr:INFO:
16/11/09 14:18:42 INFO BlockManagerMasterEndpoint: Registering block
manager :57246 with *366.3* MB RAM, BlockManagerId(driver,
, 57246)

*Questions*:
1) How is driver memory calculated ?

How did Spark decide for 366 MB for driver based on properties described
above ?

I thought the memory allocation is based on this formula (
https://www.altiscale.com/blog/spark-on-hadoop/ ):

"Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction ,where
memoryFraction=0.6, and safetyFraction=0.9. This is 1024MB x 0.6 x 0.9 =
552.96MB. However, 552.96MB is a little larger than the value as shown in
the log. This is because of the runtime overhead imposed by Scala, which is
usually around 3-7%, more or less. If you do the calculation using 982MB x
0.6 x 0.9, (982MB being approximately 4% of 1024) then you will derive the
number 530.28MB, which is what is indicated in the log file after rounding
up to 530.30MB."

2) If Spark job is launched from the Map task via
SparkLauncher#startApplication() will driver memory respect
(mapreduce.map.memory.mb and mapreduce.map.java.opts) OR
(yarn.scheduler.maximum-allocation-mb) when launching Spark Job as child
process ?

The confusion is, as SparkSubmit is a new JVM process - because it is
launched as child process of the map task, and it does not depend on Yarn
configs. But not obeying any limits (if this is the case), will make things
tricky on NodeManager reporting back memory usage.

3) Is this correct formula for calculating AM memory ?

For AM it matches to this formula calculation (
https://www.altiscale.com/blog/spark-on-hadoop/ ):how much memory to
allocate to the AM: amMemory + amMemoryOverhead
amMemoryOverhead is set to 384MB via spark.yarn.driver.memoryOverhead.
args.amMemory is fixed at 512MB by Spark when it’s running in yarn-client
mode. Adding 384MB of overhead to 512MB provides the 896MB figure requested
by Spark.

4) For Spark Yarn-client mode, are all spark.driver properties ignored, and
only spark.yarn.am properties used ?

Thanks.


Re: Strongly Connected Components

2016-11-11 Thread Daniel Darabos
Hi Shreya,
GraphFrames just calls the GraphX strongly connected components code. (
https://github.com/graphframes/graphframes/blob/release-0.2.0/src/main/scala/org/graphframes/lib/StronglyConnectedComponents.scala#L51
)

For choosing the number of iterations: If the number of iterations is less
than the diameter of the graph, you may get an incorrect result. But
running for more iterations than that buys you nothing. The algorithm is
basically to broadcast your ID to all your neighbors in the first round,
and then broadcast the smallest ID that you have seen so far in the next
rounds. So with only 1 round you will get a wrong result unless each vertex
is connected to the vertex with the lowest ID in that component. (Unlikely
in a real graph.)

See
https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
for the actual implementation.

A better algorithm exists for this problem that only requires O(log(N))
iterations when N is the largest component diameter. (It is described in "A
Model of Computation for MapReduce",
http://www.sidsuri.com/Publications_files/mrc.pdf.) This outperforms
GraphX's implementation immensely. (See the last slide of
http://www.slideshare.net/SparkSummit/interactive-graph-analytics-daniel-darabos#33.)
The large advantage is due to the lower number of necessary iterations.

For why this is failing even with one iteration: I would first check your
partitioning. Too many or too few partitions could equally cause the issue.
If you are lucky, there is no overlap between the "too many" and "too few"
domains :).

On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal 
wrote:

> Tried GraphFrames. Still faced the same – job died after a few hours . The
> errors I see (And I see tons of them) are –
>
> (I ran with 3 times the partitions as well, which was 12 times number of
> executors , but still the same.)
>
>
>
> -
>
> ERROR NativeAzureFileSystem: Encountered Storage Exception for write on
> Blob : hdp/spark2-events/application_1478717432179_0021.inprogress
> Exception details: null Error Code : RequestBodyTooLarge
>
>
>
> -
>
>
>
> 16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests
> outstanding when connection from /10.0.0.95:43301 is closed
>
> 16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2
> outstanding blocks after 5000 ms
>
> 16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 non-empty
> blocks out of 1500 blocks
>
> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.io.IOException: Connection from /10.0.0.95:43301 closed
>
>
>
> -
>
>
>
> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/
> appcache/application_1478717432179_0021/blockmgr-b1dde30d-359e-4932-b7a4-
> a5e138a52360/37/shuffle_1346_21_0.index (No such file or directory)
>
>
>
> -
>
>
>
> org.apache.spark.SparkException: Exception thrown in awaitResult
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.
> applyOrElse(RpcTimeout.scala:77)
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.
> applyOrElse(RpcTimeout.scala:75)
>
> at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at org.apache.spark.rpc.RpcTimeout.awaitResult(
> RpcTimeout.scala:83)
>
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
> RpcEndpointRef.scala:102)
>
> at org.apache.spark.executor.Executor.org$apache$spark$
> executor$Executor$$reportHeartBeat(Executor.scala:518)
>
> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply$mcV$sp(Executor.scala:547)
>
> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply(Executor.scala:547)
>
> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply(Executor.scala:547)
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
> scala:1857)
>
> at org.apache.spark.executor.Executor$$anon$1.run(Executor.
> scala:547)
>
> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
>
> at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
> at 

pyspark: accept unicode column names in DataFrame.corr and cov

2016-11-11 Thread SamPenrose
The corr() and cov() methods of DataFrame require an instance of str for
column names:

. 
https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py#L1356

although instances of basestring appear to work for addressing columns:

. 
https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py#L708

Humble request: could we replace the "isinstance(col1, str)" tests with
"isinstance(col1, basestring)"?

Less humble request: why test types at all? Why not just do one of {raise
KeyError, coerce to string}?

Cheers,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-accept-unicode-column-names-in-DataFrame-corr-and-cov-tp28065.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataSet is not able to handle 50,000 columns to sum

2016-11-11 Thread ayan guha
You can explore grouping sets in SQL and write an aggregate function to add
array wise sum.

It will boil down to something like

Select attr1,attr2...,yourAgg(Val)
>From t
Group by attr1,attr2...
Grouping sets((attr1,attr2),(aytr1))
On 12 Nov 2016 04:57, "Anil Langote"  wrote:

> Hi All,
>
>
>
> I have been working on one use case and couldn’t able to think the better
> solution, I have seen you very active on spark user list please throw your
> thoughts on implementation. Below is the requirement.
>
>
>
> I have tried using dataset by splitting the double array column but it
> fails when double size grows. When I create the double array schema data
> type spark doesn’t allow me to sum them because it would be done only on
> numeric types. If I think about storing the file per combination wise to
> parquet there will be too much parquet files.
>
>
>
> *Input : * The input file will be like below in real data the attributes
> will be *20 *& the double array would be *50,000*
>
>
>
>
>
> Attribute_0
>
> Attribute_1
>
> Attribute_2
>
> Attribute_3
>
> DoubleArray
>
> 5
>
> 3
>
> 5
>
> 3
>
> 0.2938933463658645  0.0437040427073041  0.23002681025029648
> 0.18003221216680454
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.5353599620508771  0.026777650111232787  0.31473082754161674
> 0.2647786522276575
>
> 5
>
> 3
>
> 5
>
> 2
>
> 0.8803063581705307  0.8101324740101096  0.48523937757683544
> 0.5897714618376072
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.33960064683141955  0.46537001358164043  0.543428826489435
> 0.42653939565053034
>
> 2
>
> 2
>
> 0
>
> 5
>
> 0.5108235777360906  0.4368119043922922  0.8651556676944931
> 0.7451477943975504
>
>
>
> Now below are the possible combinations in above data set this will be all
> possible combinations
>
>
>
> 1.  Attribute_0, Attribute_1
>
> 2.  Attribute_0, Attribute_2
>
> 3.  Attribute_0, Attribute_3
>
> 4.  Attribute_1, Attribute_2
>
> 5.  Attribute_2, Attribute_3
>
> 6.  Attribute_1, Attribute_3
>
> 7.  Attribute_0, Attribute_1, Attribute_2
>
> 8.  Attribute_0, Attribute_1, Attribute_3
>
> 9.  Attribute_0, Attribute_2, Attribute_3
>
> 10.  Attribute_1, Attribute_2, Attribute_3
>
> 11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4
>
>
>
> Now we have to process all these combinations on input data preferably
> parallel to get good performance.
>
>
>
> *Attribute_0, Attribute_1*
>
>
>
> In this iteration the other attributes (*Attribute_2, Attribute_3*) are
> not required all we need is Attribute_0, Attribute_1 & double array
> columns. If you see the data there are two possible combination in the data
> one is 5_3 and other one is 3_2 we have to pick only those which has at
> least 2 combinations in real data we will get in thousands.
>
>
>
>
>
> Attribute_0
>
> Attribute_1
>
> Attribute_2
>
> Attribute_3
>
> DoubleArray
>
> 5
>
> 3
>
> 5
>
> 3
>
> 0.2938933463658645  0.0437040427073041  0.23002681025029648
> 0.18003221216680454
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.5353599620508771  0.026777650111232787  0.31473082754161674
> 0.2647786522276575
>
> 5
>
> 3
>
> 5
>
> 2
>
> 0.8803063581705307  0.8101324740101096  0.48523937757683544
> 0.5897714618376072
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.33960064683141955  0.46537001358164043  0.543428826489435
> 0.42653939565053034
>
> 2
>
> 2
>
> 0
>
> 5
>
> 0.5108235777360906  0.4368119043922922  0.8651556676944931
> 0.7451477943975504
>
>
>
> when we do the groupBy on above dataset with columns *Attribute_0,
> Attribute_1 *we will get two records with keys 5_3 & 3_2 and each key
> will have two double arrays.
>
>
>
> 5_3 ==> 0.2938933463658645  0.0437040427073041  0.23002681025029648
> 0.18003221216680454 & 0.8803063581705307  0.8101324740101096
> 0.48523937757683544  0.5897714618376072
>
>
>
> 3_2 ==> 0.5353599620508771  0.026777650111232787  0.31473082754161674
> 0.2647786522276575 & 0.33960064683141955  0.46537001358164043
> 0.543428826489435  0.42653939565053034
>
>
>
> now we have to add these double arrays index wise and produce the one array
>
>
>
> 5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319,
> 0.7698036740044117]
>
> 3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518,
> 0.6913180478781878]
>
>
>
> After adding we have to compute average, min, max etc on these vector and
> store the results against the keys.
>
>
>
> Same process will be repeated for next combinations.
>
>
>
>
>
>
>
> Thank you
>
> Anil Langote
>
> +1-425-633-9747
>
>
>


appHandle.kill(), SparkSubmit Process, JVM questions related to SparkLauncher design and Spark Driver

2016-11-11 Thread Elkhan Dadashov
Few more questions to Marcelo.

Sorry Marcelo, for very long question list. I'd really appreciate your kind
help and answer to these questions in order to fully understand design
decision and architecture you have in mind while implementing very helpful
SparkLauncher.

*Scenario*: Spark job is launched via SparkLauncher#startApplication()
inside the Map task in Cluster. So i launch Hadoop Map only job, and inside
that Map job I launch Spark job.

These are the related processes launched when Spark job is launched from
the map task:

40588 YarnChild (this is map task container process)

40550 MRAppMaster(this is MR APP MASTER container)


*Spark Related processes:*

40602 SparkSubmit

40875 CoarseGrainedExecutorBackend

40846 CoarseGrainedExecutorBackend
40815 ExecutorLauncher

When Spark app is started via SparkLauncher#startApplication(), Spark
driver (inside SparkSubmit) is started as child process - new JVM process
started.

1) This child process lives outside map task YARN Container & JVM process,
but on the same machine, right ?
 Child process (SparkSubmit) will have its own JVM, right ?

 As shown in the process list above SparkSubmit is separate process.

2) As everything is external to the JVM of map task - Spark app/driver
(inside SparkSubmit) will be running in its own JVM on the same machine
where Map container is running, you use Process API offers the destroy()
and destroyForcibly() methods, which apply the appropriate platform
specific process stopping procedures.

*In order to keep parent-child process tie, and make sure child process
will die when parent process dies or killed (even not gracefully), you used
this technique*:

You created a thread with an server-side socket in accept mode on the
parent with port. When the child starts, pass that port number as a
parameter (environment variable). Have it create a thread and open that
socket. The have the thread sit on the socket forever. If the connection
ever drops, then the child exit.

Marcelo, *please correct me if i am wrong*. Is this how you make sure child
process is also killed when parent process is killed ?

3) Let's say I kill the map task forcefully or using hadoopClient kill job
by jobId, which spans Spark job using appHandle.startApplication(),

a) Spark Driver (SparkSubmit process) will also be killed , right ?
Even if the code will not have a chance call appHandle.stop() and
appHandle.kill(), child process will die too because of parent-child
relationship i described above. Is this correct ?

b) Assuming (3a) is correct, driver was killed due to parent-child
relationship, *without* appHandle.stop() and appHandle.kill() commands
executed, will Executors clean the environment (remove temp files)
before stopping ?

4) To add another level of improvement, is it good idea to attach
ShutDownHook (Runtime.getRuntime().addShutdownHook(new ShutdownThread());)
to the map task, and inside that call these 2 functions:

 appHandle.stop();
 apphandle.kill();

Thanks.

P.S: *In the below thread you will find design decisions of
appHandle.kill() implementation replied by Marcelo  (thanks a lot) - which
is interesting to know.*

On Thu, Nov 10, 2016 at 9:22 AM Marcelo Vanzin  wrote:

> Hi Elkhan,
>
> I'd prefer if these questions were asked in the mailing list.
>
> The launcher code cannot call YARN APIs directly because Spark
> supports more than just YARN. So its API and implementation has to be
> cluster-agnostic.
>
> As for kill, this is what the docs say:
>
> """
> This will not send a {@link #stop()} message to the application, so
> it's recommended that users first try to
> stop the application cleanly and only resort to this method if that fails.
> """
>
> So if you want to stop the application first, call stop().
>
>
> On Thu, Nov 10, 2016 at 12:55 AM, Elkhan Dadashov 
> wrote:
> > Hi Marcelo,
> >
> > I have few more questions related to SparkLauncher. Will be glad and
> > thankful if you could answer them.
> >
> > It seems SparkLauncher Yarn-client or Yarn-Cluster deploy mode does not
> > matter much, as even in yarn-cluster mode  the client that launches the
> > application must remain alive for the duration of the application (or
> until
> > the app handle is disconnected) which is described in LauncherServer.java
> > JavaDoc.
> >
> > 1) In yarn-cluster mode, if the client dies, then will only the appHandle
> > will be lost, or the Spark application will also die ?
> >
> > 2) May i know why did you prefer implementing appHandle.kill() with
> killing
> > process instead of let's say :
> >
> > a) yarn application -kill 
> > b) ./bin/spark-class org.apache.spark.deploy.Client kill 
> >
> > 3) If Spark Driver is killed (by killing the process, not gracefully),
> will
> > Executors clean the environment (remove temp files) ?
> >
> > Thanks a lot.
> >
> >
>
>
>
> --
> Marcelo
>


Re: Possible DR solution

2016-11-11 Thread Mich Talebzadeh
I really don't see why one wants to set up streaming replication unless for
situations where similar functionality to transactional databases is
required in big data?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 11 November 2016 at 17:24, Mich Talebzadeh 
wrote:

> I think it differs as it starts streaming data through its own port as
> soon as the first block is landed. so the granularity is a block.
>
> however, think of it as oracle golden gate replication or sap replication
> for databases. the only difference is that if the corruption in the block
> with hdfs it will be freplicated much like srdf.
>
> whereas with oracle or sap it is log based replication which stops when it
> encounters corruption.
>
> replication depends on the block. so can replicate hive metadata and
> fsimage etc. but cannot replicate hbase memstore if hbase crashes.
>
> so that is the gist of it. streaming replication as opposed to snapshot.
>
> sounds familiar. think of it as log shipping in oracle old days versus
> goldengate etc.
>
> hth
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 11 November 2016 at 17:14, Deepak Sharma  wrote:
>
>> Reason being you can set up hdfs duplication on your own to some other
>> cluster .
>>
>> On Nov 11, 2016 22:42, "Mich Talebzadeh" 
>> wrote:
>>
>>> reason being ?
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 11 November 2016 at 17:11, Deepak Sharma 
>>> wrote:
>>>
 This is waste of money I guess.

 On Nov 11, 2016 22:41, "Mich Talebzadeh" 
 wrote:

> starts at $4,000 per node per year all inclusive.
>
> With discount it can be halved but we are talking a node itself so if
> you have 5 nodes in primary and 5 nodes in DR we are talking about $40K
> already.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
> On 11 November 2016 at 16:43, Mudit Kumar 
> wrote:
>
>> Is it feasible cost wise?
>>
>>
>>
>> Thanks,
>>
>> Mudit
>>
>>
>>
>> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
>> *Sent:* Friday, November 11, 2016 2:56 PM
>> *To:* user @spark
>> *Subject:* Possible DR solution
>>
>>
>>
>> Hi,
>>
>>
>>
>> Has anyone had experience of using WanDisco
>>  block replication to create a fault
>> tolerant solution to DR in Hadoop?
>>
>>
>>
>> The product claims that it starts replicating as soon as the first
>> data block lands 

Re: Finding a Spark Equivalent for Pandas' get_dummies

2016-11-11 Thread Nicholas Sharkey
I did get *some* help from DataBricks in terms of programmatically grabbing
the categorical variables but I can't figure out where to go from here:

*# Get all string cols/categorical cols*
*stringColList = [i[0] for i in df.dtypes if i[1] == 'string']*

*# generate OHEs for every col in stringColList*
*OHEstages = [OneHotEncoder(inputCol = categoricalCol, outputCol =
categoricalCol + "Vector") for categoricalCol in stringColList]*



On Fri, Nov 11, 2016 at 2:00 PM, Nick Pentreath 
wrote:

> For now OHE supports a single column. So you have to have 1000 OHE in a
> pipeline. However you can add them programatically so it is not too bad. If
> the cardinality of each feature is quite low, it should be workable.
>
> After that user VectorAssembler to stitch the vectors together (which
> accepts multiple input columns).
>
> The other approach is - if your features are all categorical - to encode
> the features as "feature_name=feature_value" strings. This can
> unfortunately only be done with RDD ops since a UDF can't accept multiple
> columns as input at this time. You can create a new column with all the
> feature name/value pairs as a list of strings ["feature_1=foo",
> "feature_2=bar", ...]. Then use CountVectorizer to create your binary
> vectors. This basically works like the DictVectorizer in scikit-learn.
>
>
>
> On Fri, 11 Nov 2016 at 20:33 nsharkey  wrote:
>
>> I have a dataset that I need to convert some of the the variables to
>> dummy variables. The get_dummies function in Pandas works perfectly on
>> smaller datasets but since it collects I'll always be bottlenecked by the
>> master node.
>>
>> I've looked at Spark's OHE feature and while that will work in theory I
>> have over a thousand variables I need to convert so I don't want to have to
>> do 1000+ OHE. My project is pretty simple in scope: read in a raw CSV,
>> convert the categorical variables into dummy variables, then save the
>> transformed data back to CSV. That is why I'm so interested in get_dummies
>> but it's not scalable enough for my data size (500-600GB per file).
>>
>> Thanks in advance.
>>
>> Nick
>>
>> --
>> View this message in context: Finding a Spark Equivalent for Pandas'
>> get_dummies
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-11 Thread Cody Koeninger
It is already documented that you must use a different group id, which as
far as I can tell you are still not doing.

On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" 
wrote:

> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>
> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy  wrote:
>
>> Ok, I have split he KafkaRDD logic to each use their own group and bumped
>> the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms
>> ends up with a timeout and exception so I am still perplexed on that one.
>> The new error I am getting now is a `ConcurrentModificationException`
>> when Spark is trying to remove the CachedKafkaConsumer.
>>
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(
>> KafkaConsumer.java:1431)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaC
>> onsumer.java:1361)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ano
>> n$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>>
>> Here is the basic logic:
>>
>> *Using KafkaRDD* - This takes a list of channels and processes them in
>> parallel using the KafkaRDD directly. They each use a distinct consumer
>> group (s"$prefix-$topic"), and each has it's own topic and each topic
>> has 4 partitions. We routinely get timeout errors when polling for data
>> when the poll.ms is less then 2 seconds. This occurs whether we process
>> in parallel.
>>
>> *Example usage with KafkaRDD:*
>> val channels = Seq("channel1", "channel2")
>>
>> channels.toParArray.foreach { channel =>
>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>
>>   // Get offsets for the given topic and the consumer group "$prefix-$
>> topic"
>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>>
>>   val ds = KafkaUtils.createRDD[K, V](context,
>> kafkaParams asJava,
>> offsetRanges,
>> PreferConsistent).toDS[V]
>>
>>   // Do some aggregations
>>   ds.agg(...)
>>   // Save the data
>>   ds.write.mode(SaveMode.Append).parquet(somePath)
>>   // Save offsets using a KafkaConsumer
>>   consumer.commitSync(newOffsets.asJava)
>>   consumer.close()
>> }
>>
>> I am not sure why the concurrent issue is there as I have tried to debug
>> and also looked at the KafkaConsumer code as well, but everything looks
>> like it should not occur. The things to figure out is why when running in
>> parallel does this occur and also why the timeouts still occur.
>>
>> Thanks,
>>
>> Ivan
>>
>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger 
>> wrote:
>>
>>> There definitely is Kafka documentation indicating that you should use
>>> a different consumer group for logically different subscribers, this
>>> is really basic to Kafka:
>>>
>>> http://kafka.apache.org/documentation#intro_consumers
>>>
>>> As for your comment that "commit async after each RDD, which is not
>>> really viable also", how is it not viable?  Again, committing offsets
>>> to Kafka doesn't give you reliable delivery semantics unless your
>>> downstream data store is idempotent.  If your downstream data store is
>>> idempotent, then it shouldn't matter to you when offset commits
>>> happen, as long as they happen within a reasonable time after the data
>>> is written.
>>>
>>> Do you want to keep arguing with me, or follow my advice and proceed
>>> with debugging any remaining issues after you make the changes I
>>> suggested?
>>>
>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy  wrote:
>>> > With our stream version, we update the offsets for only the partition
>>> we
>>> > operating on. We even break down the partition into smaller batches
>>> and then
>>> > update the offsets after each batch within the partition. With Spark
>>> 1.6 and
>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
>>> > necessarily a Spark issue since Kafka no longer allows you to simply
>>> update
>>> > the offsets for a given consumer group. You have to subscribe or assign
>>> > partitions to even do so.
>>> >
>>> > As for storing the offsets in some other place like a DB, it don't
>>> find this
>>> > useful because you then can't use tools like Kafka Manager. In order
>>> to do
>>> > so you would have to store in a DB and the circle back and update Kafka
>>> > afterwards. This means you have to keep two sources in sync which is
>>> not
>>> > really a good idea.
>>> >
>>> > It is a challenge in Spark to use the Kafka offsets since the drive
>>> keeps
>>> > subscribed to the topic(s) and consumer group, while the executors
>>> prepend
>>> > "spark-executor-" to the consumer group. The stream (driver) does
>>> allow you
>>> > to commit async after each RDD, which is not really viable also. I
>>> have not
>>> > of implementing an Akka actor system on the driver and send it
>>> 

Re: Finding a Spark Equivalent for Pandas' get_dummies

2016-11-11 Thread Nick Pentreath
For now OHE supports a single column. So you have to have 1000 OHE in a
pipeline. However you can add them programatically so it is not too bad. If
the cardinality of each feature is quite low, it should be workable.

After that user VectorAssembler to stitch the vectors together (which
accepts multiple input columns).

The other approach is - if your features are all categorical - to encode
the features as "feature_name=feature_value" strings. This can
unfortunately only be done with RDD ops since a UDF can't accept multiple
columns as input at this time. You can create a new column with all the
feature name/value pairs as a list of strings ["feature_1=foo",
"feature_2=bar", ...]. Then use CountVectorizer to create your binary
vectors. This basically works like the DictVectorizer in scikit-learn.



On Fri, 11 Nov 2016 at 20:33 nsharkey  wrote:

> I have a dataset that I need to convert some of the the variables to dummy
> variables. The get_dummies function in Pandas works perfectly on smaller
> datasets but since it collects I'll always be bottlenecked by the master
> node.
>
> I've looked at Spark's OHE feature and while that will work in theory I
> have over a thousand variables I need to convert so I don't want to have to
> do 1000+ OHE. My project is pretty simple in scope: read in a raw CSV,
> convert the categorical variables into dummy variables, then save the
> transformed data back to CSV. That is why I'm so interested in get_dummies
> but it's not scalable enough for my data size (500-600GB per file).
>
> Thanks in advance.
>
> Nick
>
> --
> View this message in context: Finding a Spark Equivalent for Pandas'
> get_dummies
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


RE: Strongly Connected Components

2016-11-11 Thread Shreya Agarwal
Tried GraphFrames. Still faced the same - job died after a few hours . The 
errors I see (And I see tons of them) are -
(I ran with 3 times the partitions as well, which was 12 times number of 
executors , but still the same.)

-
ERROR NativeAzureFileSystem: Encountered Storage Exception for write on Blob : 
hdp/spark2-events/application_1478717432179_0021.inprogress Exception details: 
null Error Code : RequestBodyTooLarge

-

16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests 
outstanding when connection from /10.0.0.95:43301 is closed
16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 
outstanding blocks after 5000 ms
16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 non-empty 
blocks out of 1500 blocks
16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block 
fetches
java.io.IOException: Connection from /10.0.0.95:43301 closed

-

16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block 
fetches
java.lang.RuntimeException: java.io.FileNotFoundException: 
/mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/appcache/application_1478717432179_0021/blockmgr-b1dde30d-359e-4932-b7a4-a5e138a52360/37/shuffle_1346_21_0.index
 (No such file or directory)

-

org.apache.spark.SparkException: Exception thrown in awaitResult
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

-

16/11/11 13:21:54 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = 
Heartbeat(537,[Lscala.Tuple2;@2999dae4,BlockManagerId(537, 10.0.0.103, 36162))]
at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:119)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
at 

Re: Dataset API | Setting number of partitions during join/groupBy

2016-11-11 Thread Aniket Bhatnagar
Hi Shreya

Initial partitions in the Datasets were more than 1000 and after a group by
operation, the resultant Dataset had only 200 partitions (because by
default number of partitions being set to 200). Any further operations on
the resultant Dataset will have a maximum of 200 parallelism resulting in
inefficient use of cluster.

I am performing multiple join & group by operations on Datasets that are
huge (8TB+) and low parallelism severely affects the time it takes to run
the data pipeline. The workaround that
sets sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, ) works but it would be ideal to set partitions on a per
join/group by operation basis, like we could using the RDD API.

Thanks,
Aniket

On Fri, Nov 11, 2016 at 6:27 PM Shreya Agarwal 
wrote:

> Curious – why do you want to repartition? Is there a subsequent step which
> fails because the number of partitions is less? Or you want to do it for a
> perf gain?
>
>
>
> Also, what were your initial Dataset partitions and how many did you have
> for the result of join?
>
>
>
> *From:* Aniket Bhatnagar [mailto:aniket.bhatna...@gmail.com]
> *Sent:* Friday, November 11, 2016 9:22 AM
> *To:* user 
> *Subject:* Dataset API | Setting number of partitions during join/groupBy
>
>
>
> Hi
>
>
>
> I can't seem to find a way to pass number of partitions while join 2
> Datasets or doing a groupBy operation on the Dataset. There is an option of
> repartitioning the resultant Dataset but it's inefficient to repartition
> after the Dataset has been joined/grouped into default number of
> partitions. With RDD API, this was easy to do as the functions accepted a
> numPartitions parameter. The only way to do this seems to be
> sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, ) but
> this means that all join/groupBy operations going forward will have the
> same number of partitions.
>
>
>
> Thanks,
>
> Aniket
>


Finding a Spark Equivalent for Pandas' get_dummies

2016-11-11 Thread nsharkey
I have a dataset that I need to convert some of the the variables to dummy
variables. The get_dummies function in Pandas works perfectly on smaller
datasets but since it collects I'll always be bottlenecked by the master
node.

I've looked at Spark's OHE feature and while that will work in theory I
have over a thousand variables I need to convert so I don't want to have to
do 1000+ OHE. My project is pretty simple in scope: read in a raw CSV,
convert the categorical variables into dummy variables, then save the
transformed data back to CSV. That is why I'm so interested in get_dummies
but it's not scalable enough for my data size (500-600GB per file).

Thanks in advance.

Nick




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Finding-a-Spark-Equivalent-for-Pandas-get-dummies-tp28064.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: Dataset API | Setting number of partitions during join/groupBy

2016-11-11 Thread Shreya Agarwal
Curious – why do you want to repartition? Is there a subsequent step which 
fails because the number of partitions is less? Or you want to do it for a perf 
gain?

Also, what were your initial Dataset partitions and how many did you have for 
the result of join?

From: Aniket Bhatnagar [mailto:aniket.bhatna...@gmail.com]
Sent: Friday, November 11, 2016 9:22 AM
To: user 
Subject: Dataset API | Setting number of partitions during join/groupBy

Hi

I can't seem to find a way to pass number of partitions while join 2 Datasets 
or doing a groupBy operation on the Dataset. There is an option of 
repartitioning the resultant Dataset but it's inefficient to repartition after 
the Dataset has been joined/grouped into default number of partitions. With RDD 
API, this was easy to do as the functions accepted a numPartitions parameter. 
The only way to do this seems to be 
sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, ) but 
this means that all join/groupBy operations going forward will have the same 
number of partitions.

Thanks,
Aniket


Finding a Spark Equivalent for Pandas' get_dummies

2016-11-11 Thread Nicholas Sharkey
I have a dataset that I need to convert some of the the variables to dummy
variables. The get_dummies function in Pandas works perfectly on smaller
datasets but since it collects I'll always be bottlenecked by the master
node.

I've looked at Spark's OHE feature and while that will work in theory I
have over a thousand variables I need to convert so I don't want to have to
do 1000+ OHE. My project is pretty simple in scope: read in a raw CSV,
convert the categorical variables into dummy variables, then save the
transformed data back to CSV. That is why I'm so interested in get_dummies
but it's not scalable enough for my data size (500-600GB per file).

Thanks in advance.

Nick


Re: TallSkinnyQR

2016-11-11 Thread Iman Mohtashemi
Yes this would be helpful, otherwise the Q part of the decomposition is
useless. One can use that to solve the system by transposing it and
multiplying with b and solving for x  (Ax = b) where A = R and b = Qt*b
since the Upper triangular matrix is correctly available (R)

On Fri, Nov 11, 2016 at 3:56 AM Sean Owen  wrote:

> @Xiangrui / @Joseph, do you think it would be reasonable to have
> CoordinateMatrix sort the rows it creates to make an IndexedRowMatrix? in
> order to make the ultimate output of toRowMatrix less surprising when it's
> not ordered?
>
>
> On Tue, Nov 8, 2016 at 3:29 PM Sean Owen  wrote:
>
> I think the problem here is that IndexedRowMatrix.toRowMatrix does *not*
> result in a RowMatrix with rows in order of their indices, necessarily:
>
>
> // Drop its row indices.
> RowMatrix rowMat = indexedRowMatrix.toRowMatrix();
>
> What you get is a matrix where the rows are arranged in whatever order
> they were passed to IndexedRowMatrix. RowMatrix says it's for rows where
> the ordering doesn't matter, but then it's maybe surprising it has a QR
> decomposition method, because clearly the result depends on the order of
> rows in the input. (CC Yuhao Yang for a comment?)
>
> You could say, well, why doesn't IndexedRowMatrix.toRowMatrix return at
> least something with sorted rows? that would not be hard. It also won't
> return "missing" rows (all zeroes), so it would not in any event result in
> a RowMatrix whose implicit rows and ordering represented the same matrix.
> That, at least, strikes me as something to be better documented.
>
> Maybe it would be nicer still to at least sort the rows, given the
> existence of use cases like yours. For example, at least
> CoordinateMatrix.toIndexedRowMatrix could sort? that is less surprising.
>
> In any event you should be able to make it work by manually getting the
> RDD[IndexedRow] out of IndexedRowMatrix, sorting by index, then mapping it
> to Vectors and making a RowMatrix from it.
>
>
>
> On Tue, Nov 8, 2016 at 2:41 PM Iman Mohtashemi 
> wrote:
>
> Hi Sean,
> Here you go:
>
> sparsematrix.txt =
>
> row, col ,val
> 0,0,.42
> 0,1,.28
> 0,2,.89
> 1,0,.83
> 1,1,.34
> 1,2,.42
> 2,0,.23
> 3,0,.42
> 3,1,.98
> 3,2,.88
> 4,0,.23
> 4,1,.36
> 4,2,.97
>
> The vector is just the third column of the matrix which should give the
> trivial solution of [0,0,1]
>
> This translates to this which is correct
> There are zeros in the matrix (Not really sparse but just an example)
> 0.42  0.28  0.89
> 0.83  0.34  0.42
> 0.23  0.0   0.0
> 0.42  0.98  0.88
> 0.23  0.36  0.97
>
>
> Here is what I get for  the Q and R
>
> Q: -0.21470961288429483  0.23590615093828807   0.6784910613691661
> -0.3920784235278427   -0.06171221388256143  0.5847874866876442
> -0.7748216464954987   -0.4003560542230838   -0.29392323671555354
> -0.3920784235278427   0.8517909521421976-0.31435038559403217
> -0.21470961288429483  -0.23389547730301666  -0.11165321782745863
> R: -1.0712142642814275  -0.8347536340918976  -1.227672225670157
> 0.0  0.7662808691141717   0.7553315911660984
> 0.0  0.0  0.7785210939368136
>
> When running this in matlab the numbers are the same but row 1 is the last
> row and the last row is interchanged with row 3
>
>
>
> On Mon, Nov 7, 2016 at 11:35 PM Sean Owen  wrote:
>
> Rather than post a large section of code, please post a small example of
> the input matrix and its decomposition, to illustrate what you're saying is
> out of order.
>
> On Tue, Nov 8, 2016 at 3:50 AM im281  wrote:
>
> I am getting the correct rows but they are out of order. Is this a bug or
> am
> I doing something wrong?
>
>
>


DataSet is not able to handle 50,000 columns to sum

2016-11-11 Thread Anil Langote
Hi All,

 

I have been working on one use case and couldn’t able to think the better 
solution, I have seen you very active on spark user list please throw your 
thoughts on implementation. Below is the requirement.

 

I have tried using dataset by splitting the double array column but it fails 
when double size grows. When I create the double array schema data type spark 
doesn’t allow me to sum them because it would be done only on numeric types. If 
I think about storing the file per combination wise to parquet there will be 
too much parquet files.

 

Input :  The input file will be like below in real data the attributes will be 
20 & the double array would be 50,000

 

 

Attribute_0Attribute_1Attribute_2Attribute_3DoubleArray
53530.2938933463658645  0.0437040427073041  0.23002681025029648  
0.18003221216680454
32130.5353599620508771  0.026777650111232787  0.31473082754161674  
0.2647786522276575
53520.8803063581705307  0.8101324740101096  0.48523937757683544  
0.5897714618376072
32130.33960064683141955  0.46537001358164043  0.543428826489435  
0.42653939565053034
22050.5108235777360906  0.4368119043922922  0.8651556676944931  
0.7451477943975504

 

Now below are the possible combinations in above data set this will be all 
possible combinations 1.  Attribute_0, Attribute_12.  Attribute_0, 
Attribute_23.  Attribute_0, Attribute_34.  Attribute_1, Attribute_25.   
   Attribute_2, Attribute_36.  Attribute_1, Attribute_37.  Attribute_0, 
Attribute_1, Attribute_28.  Attribute_0, Attribute_1, Attribute_39.  
Attribute_0, Attribute_2, Attribute_310.  Attribute_1, Attribute_2, 
Attribute_311.  Attribute_1, Attribute_2, Attribute_3, Attribute_4 Now we have 
to process all these combinations on input data preferably parallel to get good 
performance. Attribute_0, Attribute_1 In this iteration the other attributes 
(Attribute_2, Attribute_3) are not required all we need is Attribute_0, 
Attribute_1 & double array columns. If you see the data there are two possible 
combination in the data one is 5_3 and other one is 3_2 we have to pick only 
those which has at least 2 combinations in real data we will get in thousands.  
 
Attribute_0Attribute_1Attribute_2Attribute_3DoubleArray
53530.2938933463658645  0.0437040427073041  0.23002681025029648  
0.18003221216680454
32130.5353599620508771  0.026777650111232787  0.31473082754161674  
0.2647786522276575
53520.8803063581705307  0.8101324740101096  0.48523937757683544  
0.5897714618376072
32130.33960064683141955  0.46537001358164043  0.543428826489435  
0.42653939565053034
22050.5108235777360906  0.4368119043922922  0.8651556676944931  
0.7451477943975504

 

when we do the groupBy on above dataset with columns Attribute_0, Attribute_1 
we will get two records with keys 5_3 & 3_2 and each key will have two double 
arrays.

 

5_3 ==> 0.2938933463658645  0.0437040427073041  0.23002681025029648  
0.18003221216680454 & 0.8803063581705307  0.8101324740101096  
0.48523937757683544  0.5897714618376072

 

3_2 ==> 0.5353599620508771  0.026777650111232787  0.31473082754161674  
0.2647786522276575 & 0.33960064683141955  0.46537001358164043  
0.543428826489435  0.42653939565053034

 

now we have to add these double arrays index wise and produce the one array

 

5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319, 
0.7698036740044117]

3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518, 
0.6913180478781878]

 

After adding we have to compute average, min, max etc on these vector and store 
the results against the keys.

 

Same process will be repeated for next combinations. 

 

 

 

Thank you

Anil Langote

+1-425-633-9747

 



RDD to HDFS - Kerberos - authentication error - RetryInvocationHandler

2016-11-11 Thread Gerard Casey
Hi all,

I have an RDD that I wish to write to HDFS.

data.saveAsTextFile("hdfs://path/vertices")

This returns: WARN RetryInvocationHandler: Exception while invoking 
ClientNamenodeProtocolTranslatorPB.getFileInfo over null. Not retrying because 
try once and fail.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
 SIMPLE authentication is not enabled.  Available:[TOKEN, KERBEROS]

I have checked KERBEROS and it is properly authenticated. The Available:[TOKEN, 
KERBEROS] seems to show that the spark conf file is setup correctly. 

How do I fix this?! 

Thanks! 

Geroid

Re: Possible DR solution

2016-11-11 Thread Mich Talebzadeh
I think it differs as it starts streaming data through its own port as soon
as the first block is landed. so the granularity is a block.

however, think of it as oracle golden gate replication or sap replication
for databases. the only difference is that if the corruption in the block
with hdfs it will be freplicated much like srdf.

whereas with oracle or sap it is log based replication which stops when it
encounters corruption.

replication depends on the block. so can replicate hive metadata and
fsimage etc. but cannot replicate hbase memstore if hbase crashes.

so that is the gist of it. streaming replication as opposed to snapshot.

sounds familiar. think of it as log shipping in oracle old days versus
goldengate etc.

hth

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 11 November 2016 at 17:14, Deepak Sharma  wrote:

> Reason being you can set up hdfs duplication on your own to some other
> cluster .
>
> On Nov 11, 2016 22:42, "Mich Talebzadeh" 
> wrote:
>
>> reason being ?
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 11 November 2016 at 17:11, Deepak Sharma 
>> wrote:
>>
>>> This is waste of money I guess.
>>>
>>> On Nov 11, 2016 22:41, "Mich Talebzadeh" 
>>> wrote:
>>>
 starts at $4,000 per node per year all inclusive.

 With discount it can be halved but we are talking a node itself so if
 you have 5 nodes in primary and 5 nodes in DR we are talking about $40K
 already.

 HTH

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 11 November 2016 at 16:43, Mudit Kumar 
 wrote:

> Is it feasible cost wise?
>
>
>
> Thanks,
>
> Mudit
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Friday, November 11, 2016 2:56 PM
> *To:* user @spark
> *Subject:* Possible DR solution
>
>
>
> Hi,
>
>
>
> Has anyone had experience of using WanDisco
>  block replication to create a fault
> tolerant solution to DR in Hadoop?
>
>
>
> The product claims that it starts replicating as soon as the first
> data block lands on HDFS and takes the block and sends it to DR/replicate
> site. The idea is that is faster than doing it through traditional HDFS
> copy tools which are normally batch oriented.
>
>
>
> It also claims to replicate Hive metadata as well.
>
>
>
> I wanted to gauge if anyone has used it or a competitor product. The
> claim is that they do not have competitors!
>
>
>
> Thanks
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical 

Dataset API | Setting number of partitions during join/groupBy

2016-11-11 Thread Aniket Bhatnagar
Hi

I can't seem to find a way to pass number of partitions while join 2
Datasets or doing a groupBy operation on the Dataset. There is an option of
repartitioning the resultant Dataset but it's inefficient to repartition
after the Dataset has been joined/grouped into default number of
partitions. With RDD API, this was easy to do as the functions accepted a
numPartitions parameter. The only way to do this seems to be
sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, ) but
this means that all join/groupBy operations going forward will have the
same number of partitions.

Thanks,
Aniket


Re: Possible DR solution

2016-11-11 Thread Mich Talebzadeh
reason being ?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 11 November 2016 at 17:11, Deepak Sharma  wrote:

> This is waste of money I guess.
>
> On Nov 11, 2016 22:41, "Mich Talebzadeh" 
> wrote:
>
>> starts at $4,000 per node per year all inclusive.
>>
>> With discount it can be halved but we are talking a node itself so if you
>> have 5 nodes in primary and 5 nodes in DR we are talking about $40K already.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 11 November 2016 at 16:43, Mudit Kumar  wrote:
>>
>>> Is it feasible cost wise?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Mudit
>>>
>>>
>>>
>>> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
>>> *Sent:* Friday, November 11, 2016 2:56 PM
>>> *To:* user @spark
>>> *Subject:* Possible DR solution
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> Has anyone had experience of using WanDisco 
>>> block replication to create a fault tolerant solution to DR in Hadoop?
>>>
>>>
>>>
>>> The product claims that it starts replicating as soon as the first data
>>> block lands on HDFS and takes the block and sends it to DR/replicate site.
>>> The idea is that is faster than doing it through traditional HDFS copy
>>> tools which are normally batch oriented.
>>>
>>>
>>>
>>> It also claims to replicate Hive metadata as well.
>>>
>>>
>>>
>>> I wanted to gauge if anyone has used it or a competitor product. The
>>> claim is that they do not have competitors!
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn  
>>> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>
>>


Re: Possible DR solution

2016-11-11 Thread Deepak Sharma
Reason being you can set up hdfs duplication on your own to some other
cluster .

On Nov 11, 2016 22:42, "Mich Talebzadeh"  wrote:

> reason being ?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 11 November 2016 at 17:11, Deepak Sharma  wrote:
>
>> This is waste of money I guess.
>>
>> On Nov 11, 2016 22:41, "Mich Talebzadeh" 
>> wrote:
>>
>>> starts at $4,000 per node per year all inclusive.
>>>
>>> With discount it can be halved but we are talking a node itself so if
>>> you have 5 nodes in primary and 5 nodes in DR we are talking about $40K
>>> already.
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 11 November 2016 at 16:43, Mudit Kumar  wrote:
>>>
 Is it feasible cost wise?



 Thanks,

 Mudit



 *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
 *Sent:* Friday, November 11, 2016 2:56 PM
 *To:* user @spark
 *Subject:* Possible DR solution



 Hi,



 Has anyone had experience of using WanDisco 
 block replication to create a fault tolerant solution to DR in Hadoop?



 The product claims that it starts replicating as soon as the first data
 block lands on HDFS and takes the block and sends it to DR/replicate site.
 The idea is that is faster than doing it through traditional HDFS copy
 tools which are normally batch oriented.



 It also claims to replicate Hive metadata as well.



 I wanted to gauge if anyone has used it or a competitor product. The
 claim is that they do not have competitors!



 Thanks




 Dr Mich Talebzadeh



 LinkedIn  
 *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



>>>
>>>
>


Re: How to use Spark SQL to connect to Cassandra from Spark-Shell?

2016-11-11 Thread Russell Spitzer
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md

Has all the information about Dataframes/ SparkSql

On Fri, Nov 11, 2016 at 8:52 AM kant kodali  wrote:

> Wait I cannot create CassandraSQLContext from spark-shell. is this only
> for enterprise versions?
>
> Thanks!
>
> On Fri, Nov 11, 2016 at 8:14 AM, kant kodali  wrote:
>
>
> https://academy.datastax.com/courses/ds320-analytics-apache-spark/spark-sql-spark-sql-basics
>
> On Fri, Nov 11, 2016 at 8:11 AM, kant kodali  wrote:
>
> Hi,
>
> This is spark-cassandra-connector
>  but I am looking
> more for how to use SPARK SQL and expose as a JDBC server for Cassandra.
>
> Thanks!
>
>
> On Fri, Nov 11, 2016 at 8:07 AM, Yong Zhang  wrote:
>
> Read the document on https://github.com/datastax/spark-cassandra-connector
>
>
> Yong
>
>
>
> --
> *From:* kant kodali 
> *Sent:* Friday, November 11, 2016 11:04 AM
> *To:* user @spark
> *Subject:* How to use Spark SQL to connect to Cassandra from Spark-Shell?
>
> How to use Spark SQL to connect to Cassandra from Spark-Shell?
>
> Any examples ? I use Java 8.
>
> Thanks!
> kant
>
>
>
>
>


Re: Possible DR solution

2016-11-11 Thread Deepak Sharma
This is waste of money I guess.

On Nov 11, 2016 22:41, "Mich Talebzadeh"  wrote:

> starts at $4,000 per node per year all inclusive.
>
> With discount it can be halved but we are talking a node itself so if you
> have 5 nodes in primary and 5 nodes in DR we are talking about $40K already.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 11 November 2016 at 16:43, Mudit Kumar  wrote:
>
>> Is it feasible cost wise?
>>
>>
>>
>> Thanks,
>>
>> Mudit
>>
>>
>>
>> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
>> *Sent:* Friday, November 11, 2016 2:56 PM
>> *To:* user @spark
>> *Subject:* Possible DR solution
>>
>>
>>
>> Hi,
>>
>>
>>
>> Has anyone had experience of using WanDisco 
>> block replication to create a fault tolerant solution to DR in Hadoop?
>>
>>
>>
>> The product claims that it starts replicating as soon as the first data
>> block lands on HDFS and takes the block and sends it to DR/replicate site.
>> The idea is that is faster than doing it through traditional HDFS copy
>> tools which are normally batch oriented.
>>
>>
>>
>> It also claims to replicate Hive metadata as well.
>>
>>
>>
>> I wanted to gauge if anyone has used it or a competitor product. The
>> claim is that they do not have competitors!
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn  
>> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


Re: Possible DR solution

2016-11-11 Thread Mich Talebzadeh
starts at $4,000 per node per year all inclusive.

With discount it can be halved but we are talking a node itself so if you
have 5 nodes in primary and 5 nodes in DR we are talking about $40K already.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 11 November 2016 at 16:43, Mudit Kumar  wrote:

> Is it feasible cost wise?
>
>
>
> Thanks,
>
> Mudit
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Friday, November 11, 2016 2:56 PM
> *To:* user @spark
> *Subject:* Possible DR solution
>
>
>
> Hi,
>
>
>
> Has anyone had experience of using WanDisco 
> block replication to create a fault tolerant solution to DR in Hadoop?
>
>
>
> The product claims that it starts replicating as soon as the first data
> block lands on HDFS and takes the block and sends it to DR/replicate site.
> The idea is that is faster than doing it through traditional HDFS copy
> tools which are normally batch oriented.
>
>
>
> It also claims to replicate Hive metadata as well.
>
>
>
> I wanted to gauge if anyone has used it or a competitor product. The claim
> is that they do not have competitors!
>
>
>
> Thanks
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: How to use Spark SQL to connect to Cassandra from Spark-Shell?

2016-11-11 Thread kant kodali
Wait I cannot create CassandraSQLContext from spark-shell. is this only for
enterprise versions?

Thanks!

On Fri, Nov 11, 2016 at 8:14 AM, kant kodali  wrote:

> https://academy.datastax.com/courses/ds320-analytics-
> apache-spark/spark-sql-spark-sql-basics
>
> On Fri, Nov 11, 2016 at 8:11 AM, kant kodali  wrote:
>
>> Hi,
>>
>> This is spark-cassandra-connector
>>  but I am looking
>> more for how to use SPARK SQL and expose as a JDBC server for Cassandra.
>>
>> Thanks!
>>
>>
>> On Fri, Nov 11, 2016 at 8:07 AM, Yong Zhang  wrote:
>>
>>> Read the document on https://github.com/datastax
>>> /spark-cassandra-connector
>>>
>>>
>>> Yong
>>>
>>>
>>>
>>> --
>>> *From:* kant kodali 
>>> *Sent:* Friday, November 11, 2016 11:04 AM
>>> *To:* user @spark
>>> *Subject:* How to use Spark SQL to connect to Cassandra from
>>> Spark-Shell?
>>>
>>> How to use Spark SQL to connect to Cassandra from Spark-Shell?
>>>
>>> Any examples ? I use Java 8.
>>>
>>> Thanks!
>>> kant
>>>
>>
>>
>


RE: Possible DR solution

2016-11-11 Thread Mudit Kumar
Is it feasible cost wise?

Thanks,
Mudit

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, November 11, 2016 2:56 PM
To: user @spark
Subject: Possible DR solution

Hi,

Has anyone had experience of using WanDisco block 
replication to create a fault tolerant solution to DR in Hadoop?

The product claims that it starts replicating as soon as the first data block 
lands on HDFS and takes the block and sends it to DR/replicate site. The idea 
is that is faster than doing it through traditional HDFS copy tools which are 
normally batch oriented.

It also claims to replicate Hive metadata as well.

I wanted to gauge if anyone has used it or a competitor product. The claim is 
that they do not have competitors!

Thanks



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




Kafka Producer within a docker Instance

2016-11-11 Thread Raghav
Hi

I run a spark job, where the executor is within a docker instance.  I want
to push the spark job output (one by one) to a Kafka broker which is
outside the docker instance.

Has anyone tried anything like this where Kafka producer is within a docker
and broker is outside ? I am a newbie to both Spark and Kafka, and looking
for some pointers to start exploring.

Thanks.

-- 
Raghav


Re: How to use Spark SQL to connect to Cassandra from Spark-Shell?

2016-11-11 Thread kant kodali
https://academy.datastax.com/courses/ds320-analytics-apache-spark/spark-sql-spark-sql-basics

On Fri, Nov 11, 2016 at 8:11 AM, kant kodali  wrote:

> Hi,
>
> This is spark-cassandra-connector
>  but I am looking
> more for how to use SPARK SQL and expose as a JDBC server for Cassandra.
>
> Thanks!
>
>
> On Fri, Nov 11, 2016 at 8:07 AM, Yong Zhang  wrote:
>
>> Read the document on https://github.com/datastax
>> /spark-cassandra-connector
>>
>>
>> Yong
>>
>>
>>
>> --
>> *From:* kant kodali 
>> *Sent:* Friday, November 11, 2016 11:04 AM
>> *To:* user @spark
>> *Subject:* How to use Spark SQL to connect to Cassandra from Spark-Shell?
>>
>> How to use Spark SQL to connect to Cassandra from Spark-Shell?
>>
>> Any examples ? I use Java 8.
>>
>> Thanks!
>> kant
>>
>
>


Re: How to use Spark SQL to connect to Cassandra from Spark-Shell?

2016-11-11 Thread kant kodali
Hi,

This is spark-cassandra-connector
 but I am looking
more for how to use SPARK SQL and expose as a JDBC server for Cassandra.

Thanks!


On Fri, Nov 11, 2016 at 8:07 AM, Yong Zhang  wrote:

> Read the document on https://github.com/datastax/spark-cassandra-connector
>
>
> Yong
>
>
>
> --
> *From:* kant kodali 
> *Sent:* Friday, November 11, 2016 11:04 AM
> *To:* user @spark
> *Subject:* How to use Spark SQL to connect to Cassandra from Spark-Shell?
>
> How to use Spark SQL to connect to Cassandra from Spark-Shell?
>
> Any examples ? I use Java 8.
>
> Thanks!
> kant
>


Re: How to use Spark SQL to connect to Cassandra from Spark-Shell?

2016-11-11 Thread Yong Zhang
Read the document on https://github.com/datastax/spark-cassandra-connector


Yong




From: kant kodali 
Sent: Friday, November 11, 2016 11:04 AM
To: user @spark
Subject: How to use Spark SQL to connect to Cassandra from Spark-Shell?

How to use Spark SQL to connect to Cassandra from Spark-Shell?

Any examples ? I use Java 8.

Thanks!
kant


How to use Spark SQL to connect to Cassandra from Spark-Shell?

2016-11-11 Thread kant kodali
How to use Spark SQL to connect to Cassandra from Spark-Shell?

Any examples ? I use Java 8.

Thanks!
kant


Possible DR solution

2016-11-11 Thread Mich Talebzadeh
Hi,

Has anyone had experience of using WanDisco 
block replication to create a fault tolerant solution to DR in Hadoop?

The product claims that it starts replicating as soon as the first data
block lands on HDFS and takes the block and sends it to DR/replicate site.
The idea is that is faster than doing it through traditional HDFS copy
tools which are normally batch oriented.

It also claims to replicate Hive metadata as well.

I wanted to gauge if anyone has used it or a competitor product. The claim
is that they do not have competitors!

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


load large number of files from s3

2016-11-11 Thread Shawn Wan
Hi,
We have 30 million small files (100k each) on s3. I want to know how bad it
is to load them directly from s3 ( eg driver memory, io, executor memory,
s3 reliability) before merge or distcp them. Anybody has experience? Thanks
in advance!

Regards,
Shawn




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/load-large-number-of-files-from-s3-tp28062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

load large number of files from s3

2016-11-11 Thread Xiaomeng Wan
Hi,
We have 30 million small files (100k each) on s3. I want to know how bad it
is to load them directly from s3 ( eg driver memory, io, executor memory,
s3 reliability) before merge or distcp them. Anybody has experience? Thanks
in advance!

Regards,
Shawn


Re: TallSkinnyQR

2016-11-11 Thread Sean Owen
@Xiangrui / @Joseph, do you think it would be reasonable to have
CoordinateMatrix sort the rows it creates to make an IndexedRowMatrix? in
order to make the ultimate output of toRowMatrix less surprising when it's
not ordered?

On Tue, Nov 8, 2016 at 3:29 PM Sean Owen  wrote:

> I think the problem here is that IndexedRowMatrix.toRowMatrix does *not*
> result in a RowMatrix with rows in order of their indices, necessarily:
>
>
> // Drop its row indices.
> RowMatrix rowMat = indexedRowMatrix.toRowMatrix();
>
> What you get is a matrix where the rows are arranged in whatever order
> they were passed to IndexedRowMatrix. RowMatrix says it's for rows where
> the ordering doesn't matter, but then it's maybe surprising it has a QR
> decomposition method, because clearly the result depends on the order of
> rows in the input. (CC Yuhao Yang for a comment?)
>
> You could say, well, why doesn't IndexedRowMatrix.toRowMatrix return at
> least something with sorted rows? that would not be hard. It also won't
> return "missing" rows (all zeroes), so it would not in any event result in
> a RowMatrix whose implicit rows and ordering represented the same matrix.
> That, at least, strikes me as something to be better documented.
>
> Maybe it would be nicer still to at least sort the rows, given the
> existence of use cases like yours. For example, at least
> CoordinateMatrix.toIndexedRowMatrix could sort? that is less surprising.
>
> In any event you should be able to make it work by manually getting the
> RDD[IndexedRow] out of IndexedRowMatrix, sorting by index, then mapping it
> to Vectors and making a RowMatrix from it.
>
>
>
> On Tue, Nov 8, 2016 at 2:41 PM Iman Mohtashemi 
> wrote:
>
> Hi Sean,
> Here you go:
>
> sparsematrix.txt =
>
> row, col ,val
> 0,0,.42
> 0,1,.28
> 0,2,.89
> 1,0,.83
> 1,1,.34
> 1,2,.42
> 2,0,.23
> 3,0,.42
> 3,1,.98
> 3,2,.88
> 4,0,.23
> 4,1,.36
> 4,2,.97
>
> The vector is just the third column of the matrix which should give the
> trivial solution of [0,0,1]
>
> This translates to this which is correct
> There are zeros in the matrix (Not really sparse but just an example)
> 0.42  0.28  0.89
> 0.83  0.34  0.42
> 0.23  0.0   0.0
> 0.42  0.98  0.88
> 0.23  0.36  0.97
>
>
> Here is what I get for  the Q and R
>
> Q: -0.21470961288429483  0.23590615093828807   0.6784910613691661
> -0.3920784235278427   -0.06171221388256143  0.5847874866876442
> -0.7748216464954987   -0.4003560542230838   -0.29392323671555354
> -0.3920784235278427   0.8517909521421976-0.31435038559403217
> -0.21470961288429483  -0.23389547730301666  -0.11165321782745863
> R: -1.0712142642814275  -0.8347536340918976  -1.227672225670157
> 0.0  0.7662808691141717   0.7553315911660984
> 0.0  0.0  0.7785210939368136
>
> When running this in matlab the numbers are the same but row 1 is the last
> row and the last row is interchanged with row 3
>
>
>
> On Mon, Nov 7, 2016 at 11:35 PM Sean Owen  wrote:
>
> Rather than post a large section of code, please post a small example of
> the input matrix and its decomposition, to illustrate what you're saying is
> out of order.
>
> On Tue, Nov 8, 2016 at 3:50 AM im281  wrote:
>
> I am getting the correct rows but they are out of order. Is this a bug or
> am
> I doing something wrong?
>
>
>