Re: Livy with Spark package

2017-08-23 Thread ayan guha
Thanks and agreed :)

On Thu, Aug 24, 2017 at 12:01 PM, Saisai Shao 
wrote:

> You could set "spark.jars.packages" in `conf` field of session post API (
> https://github.com/apache/incubator-livy/blob/master/
> docs/rest-api.md#post-sessions). This is equal to --package in
> spark-submit.
>
> BTW you'd better ask livy question in u...@livy.incubator.apache.org.
>
> Thanks
> Jerry
>
> On Thu, Aug 24, 2017 at 8:11 AM, ayan guha  wrote:
>
>> Hi
>>
>> I have a python program which I invoke as
>>
>>  spark-submit --packages com.databricks:spark-avro_2.11:3.2.0
>> somefile.py  "2017-08-23 02:00:00"  and it works
>>
>> Now I want to submit this file using Livy. I could work out most of the
>> stuff (like putting files to HDFS etc) but not able to understand how/where
>> to configure the "packages" switch...Any help?
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Livy with Spark package

2017-08-23 Thread Saisai Shao
You could set "spark.jars.packages" in `conf` field of session post API (
https://github.com/apache/incubator-livy/blob/master/docs/rest-api.md#post-sessions).
This is equal to --package in spark-submit.

BTW you'd better ask livy question in u...@livy.incubator.apache.org.

Thanks
Jerry

On Thu, Aug 24, 2017 at 8:11 AM, ayan guha  wrote:

> Hi
>
> I have a python program which I invoke as
>
>  spark-submit --packages com.databricks:spark-avro_2.11:3.2.0 somefile.py
>  "2017-08-23 02:00:00"  and it works
>
> Now I want to submit this file using Livy. I could work out most of the
> stuff (like putting files to HDFS etc) but not able to understand how/where
> to configure the "packages" switch...Any help?
> --
> Best Regards,
> Ayan Guha
>


Re: PySpark, Structured Streaming and Kafka

2017-08-23 Thread Brian Wylie
Shixiong,

Your suggestion works if I use the pyspark-shell directly. In this case I
want to setup a Spark Session from within my Jupyter Notebook.

My question/issue is related to this SO question:
https://stackoverflow.com/questions/35762459/add-jar-to-standalone-pyspark

so basically I want to add  --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
to the my python code that creates the session


Something like

# Spin up a local Spark Session
spark = SparkSession.builder.appName('my_awesome')\
.config('spark.jars.packages',
'org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0')\
.getOrCreate()
>>>

Unfortunately this doesn't actually work.. :)

I'm sure it's straightforward to have Kafka work with PySpark... I'm just
naive about how the packages get loaded...


On Wed, Aug 23, 2017 at 4:51 PM, Shixiong(Ryan) Zhu  wrote:

> You can use `bin/pyspark --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0`
> to start "pyspark". If you want to use "spark-submit", you also need to
> provide your Python file.
>
> On Wed, Aug 23, 2017 at 1:41 PM, Brian Wylie 
> wrote:
>
>> Hi All,
>>
>> I'm trying the new hotness of using Kafka and Structured Streaming.
>>
>> Resources that I've looked at
>> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
>> - https://databricks.com/blog/2016/07/28/structured-streamin
>> g-in-apache-spark.html
>> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
>> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/St
>> ructured%20Streaming%20using%20Python%20DataFrames%20API.html
>>
>> My setup is a bit weird (yes.. yes.. I know...)
>> - Eventually I'll just use a DataBricks cluster and life will be bliss :)
>> - But for now I want to test/try stuff out on my little Mac Laptop
>>
>> The newest version of PySpark will install a local Spark server with a
>> simple:
>> $ pip install pyspark
>>
>> This is very nice. I've put together a little notebook using that kewl
>> feature:
>> - https://github.com/Kitware/BroThon/blob/master/notebooks/B
>> ro_to_Spark_Cheesy.ipynb
>>
>> So the next step is the setup/use a Kafka message queue and that went
>> well/works fine.
>>
>> $ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns
>>
>> *I get messages spitting out*
>>
>> {"ts":1503513688.232274,"uid":"CdA64S2Z6Xh555","id.orig_h":"192.168.1.7","id.orig_p":58528,"id.resp_h":"192.168.1.1","id.resp_p":53,"proto":"udp","trans_id":43933,"rtt":0.02226,"query":"brian.wylie.is.awesome.tk","qclass":1,"qclass_name":"C_INTERNET","qtype":1,"qtype_name":"A","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["17.188.137.55","17.188.142.54","17.188.138.55","17.188.141.184","17.188.129.50","17.188.128.178","17.188.129.178","17.188.141.56"],"TTLs":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],"rejected":false}
>>
>>
>> Okay, finally getting to my question:
>> - Local spark server (good)
>> - Local kafka server and messages getting produced (good)
>> - Trying to this line of PySpark code (not good)
>>
>> # Setup connection to Kafka Stream dns_events = 
>> spark.readStream.format('kafka')\
>>   .option('kafka.bootstrap.servers', 'localhost:9092')\
>>   .option('subscribe', 'dns')\
>>   .option('startingOffsets', 'latest')\
>>   .load()
>>
>>
>> fails with:
>> java.lang.ClassNotFoundException: Failed to find data source: kafka.
>> Please find packages at http://spark.apache.org/third-party-projects.html
>>
>> I've looked that the URL listed... and poking around I can see that maybe
>> I need the kafka jar file as part of my local server.
>>
>> I lamely tried this:
>> $ spark-submit --packages org.apache.spark:spark-sql-kaf
>> ka-0-10_2.11:2.2.0
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Missing
>> application resource. at org.apache.spark.launcher.Comm
>> andBuilderUtils.checkArgument(CommandBuilderUtils.java:241) at
>> org.apache.spark.launcher.SparkSubmitCommandBuilder.buildSpa
>> rkSubmitArgs(SparkSubmitCommandBuilder.java:160) at
>> org.apache.spark.launcher.SparkSubmitCommandBuilder.buildSpa
>> rkSubmitCommand(SparkSubmitCommandBuilder.java:274) at
>> org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCom
>> mand(SparkSubmitCommandBuilder.java:151) at
>> org.apache.spark.launcher.Main.main(Main.java:86)
>>
>>
>> Anyway, all my code/versions/etc are in this notebook:
>> - https://github.com/Kitware/BroThon/blob/master/notebooks/Bro
>> _to_Spark.ipynb
>>
>> I'd be tremendously appreciative of some super nice, smart person if they
>> could point me in the right direction :)
>>
>> -Brian Wylie
>>
>
>


Livy with Spark package

2017-08-23 Thread ayan guha
Hi

I have a python program which I invoke as

 spark-submit --packages com.databricks:spark-avro_2.11:3.2.0 somefile.py
 "2017-08-23 02:00:00"  and it works

Now I want to submit this file using Livy. I could work out most of the
stuff (like putting files to HDFS etc) but not able to understand how/where
to configure the "packages" switch...Any help?
-- 
Best Regards,
Ayan Guha


Re: Training A ML Model on a Huge Dataframe

2017-08-23 Thread Suzen, Mehmet
SGD is supported. I see I assumed you were using Scala. Looks like you can
do streaming regression, not sure of pyspark API though:

https://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression

On 23 August 2017 at 18:22, Sea aj  wrote:

> Thanks for the reply.
>
> As far as I understood mini batch is not yet supported in ML libarary. As
> for MLLib minibatch, I could not find any pyspark api.
>
>
>
>  Sent with Mailtrack
> 
>
> On Wed, Aug 23, 2017 at 2:59 PM, Suzen, Mehmet  wrote:
>
>> It depends on what model you would like to train but models requiring
>> optimisation could use SGD with mini batches. See:
>> https://spark.apache.org/docs/latest/mllib-optimization.html
>> #stochastic-gradient-descent-sgd
>>
>> On 23 August 2017 at 14:27, Sea aj  wrote:
>>
>>> Hi,
>>>
>>> I am trying to feed a huge dataframe to a ml algorithm in Spark but it
>>> crashes due to the shortage of memory.
>>>
>>> Is there a way to train the model on a subset of the data in multiple
>>> steps?
>>>
>>> Thanks
>>>
>>>
>>>
>>>  Sent with Mailtrack
>>> 
>>>
>>
>>
>>
>> --
>>
>> Mehmet Süzen, MSc, PhD
>> 
>>
>> | PRIVILEGED AND CONFIDENTIAL COMMUNICATION This e-mail transmission, and
>> any documents, files or previous e-mail messages attached to it, may
>> contain confidential information that is legally privileged. If you are not
>> the intended recipient or a person responsible for delivering it to the
>> intended recipient, you are hereby notified that any disclosure, copying,
>> distribution or use of any of the information contained in or attached to
>> this transmission is STRICTLY PROHIBITED within the applicable law. If you
>> have received this transmission in error, please: (1) immediately notify me
>> by reply e-mail to su...@acm.org,  and (2) destroy the original
>> transmission and its attachments without reading or saving in any manner. |
>>
>
>


-- 

Mehmet Süzen, MSc, PhD


| PRIVILEGED AND CONFIDENTIAL COMMUNICATION This e-mail transmission, and
any documents, files or previous e-mail messages attached to it, may
contain confidential information that is legally privileged. If you are not
the intended recipient or a person responsible for delivering it to the
intended recipient, you are hereby notified that any disclosure, copying,
distribution or use of any of the information contained in or attached to
this transmission is STRICTLY PROHIBITED within the applicable law. If you
have received this transmission in error, please: (1) immediately notify me
by reply e-mail to su...@acm.org,  and (2) destroy the original
transmission and its attachments without reading or saving in any manner. |


Re: PySpark, Structured Streaming and Kafka

2017-08-23 Thread Shixiong(Ryan) Zhu
You can use `bin/pyspark --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0` to start "pyspark". If
you want to use "spark-submit", you also need to provide your Python file.

On Wed, Aug 23, 2017 at 1:41 PM, Brian Wylie 
wrote:

> Hi All,
>
> I'm trying the new hotness of using Kafka and Structured Streaming.
>
> Resources that I've looked at
> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
> - https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.
> 0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html
>
> My setup is a bit weird (yes.. yes.. I know...)
> - Eventually I'll just use a DataBricks cluster and life will be bliss :)
> - But for now I want to test/try stuff out on my little Mac Laptop
>
> The newest version of PySpark will install a local Spark server with a
> simple:
> $ pip install pyspark
>
> This is very nice. I've put together a little notebook using that kewl
> feature:
> - https://github.com/Kitware/BroThon/blob/master/notebooks/
> Bro_to_Spark_Cheesy.ipynb
>
> So the next step is the setup/use a Kafka message queue and that went
> well/works fine.
>
> $ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns
>
> *I get messages spitting out*
>
> {"ts":1503513688.232274,"uid":"CdA64S2Z6Xh555","id.orig_h":"192.168.1.7","id.orig_p":58528,"id.resp_h":"192.168.1.1","id.resp_p":53,"proto":"udp","trans_id":43933,"rtt":0.02226,"query":"brian.wylie.is.awesome.tk","qclass":1,"qclass_name":"C_INTERNET","qtype":1,"qtype_name":"A","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["17.188.137.55","17.188.142.54","17.188.138.55","17.188.141.184","17.188.129.50","17.188.128.178","17.188.129.178","17.188.141.56"],"TTLs":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],"rejected":false}
>
>
> Okay, finally getting to my question:
> - Local spark server (good)
> - Local kafka server and messages getting produced (good)
> - Trying to this line of PySpark code (not good)
>
> # Setup connection to Kafka Stream dns_events = 
> spark.readStream.format('kafka')\
>   .option('kafka.bootstrap.servers', 'localhost:9092')\
>   .option('subscribe', 'dns')\
>   .option('startingOffsets', 'latest')\
>   .load()
>
>
> fails with:
> java.lang.ClassNotFoundException: Failed to find data source: kafka.
> Please find packages at http://spark.apache.org/third-party-projects.html
>
> I've looked that the URL listed... and poking around I can see that maybe
> I need the kafka jar file as part of my local server.
>
> I lamely tried this:
> $ spark-submit --packages org.apache.spark:spark-sql-
> kafka-0-10_2.11:2.2.0
>
> Exception in thread "main" java.lang.IllegalArgumentException: Missing
> application resource. at org.apache.spark.launcher.CommandBuilderUtils.
> checkArgument(CommandBuilderUtils.java:241) at org.apache.spark.launcher.
> SparkSubmitCommandBuilder.buildSparkSubmitArgs(SparkSubmitCommandBuilder.java:160)
> at org.apache.spark.launcher.SparkSubmitCommandBuilder.
> buildSparkSubmitCommand(SparkSubmitCommandBuilder.java:274) at
> org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(
> SparkSubmitCommandBuilder.java:151) at org.apache.spark.launcher.
> Main.main(Main.java:86)
>
>
> Anyway, all my code/versions/etc are in this notebook:
> - https://github.com/Kitware/BroThon/blob/master/notebooks/
> Bro_to_Spark.ipynb
>
> I'd be tremendously appreciative of some super nice, smart person if they
> could point me in the right direction :)
>
> -Brian Wylie
>


Re: PySpark, Structured Streaming and Kafka

2017-08-23 Thread Riccardo Ferrari
Hi Brian,

Very nice work you have done!

WRT you issue: Can you clarify how are you adding the kafka dependency when
using Jupyter? The ClassNotFoundException really tells you about the
missing dependency.

A bit different is the IllegalArgumentException error, that is simply
because you are not telling spark what you want to submit:
https://issues.apache.org/jira/browse/SPARK-15360

Best,


On Wed, Aug 23, 2017 at 10:41 PM, Brian Wylie 
wrote:

> Hi All,
>
> I'm trying the new hotness of using Kafka and Structured Streaming.
>
> Resources that I've looked at
> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
> - https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.
> 0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html
>
> My setup is a bit weird (yes.. yes.. I know...)
> - Eventually I'll just use a DataBricks cluster and life will be bliss :)
> - But for now I want to test/try stuff out on my little Mac Laptop
>
> The newest version of PySpark will install a local Spark server with a
> simple:
> $ pip install pyspark
>
> This is very nice. I've put together a little notebook using that kewl
> feature:
> - https://github.com/Kitware/BroThon/blob/master/notebooks/
> Bro_to_Spark_Cheesy.ipynb
>
> So the next step is the setup/use a Kafka message queue and that went
> well/works fine.
>
> $ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns
>
> *I get messages spitting out*
>
> {"ts":1503513688.232274,"uid":"CdA64S2Z6Xh555","id.orig_h":"192.168.1.7","id.orig_p":58528,"id.resp_h":"192.168.1.1","id.resp_p":53,"proto":"udp","trans_id":43933,"rtt":0.02226,"query":"brian.wylie.is.awesome.tk","qclass":1,"qclass_name":"C_INTERNET","qtype":1,"qtype_name":"A","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["17.188.137.55","17.188.142.54","17.188.138.55","17.188.141.184","17.188.129.50","17.188.128.178","17.188.129.178","17.188.141.56"],"TTLs":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],"rejected":false}
>
>
> Okay, finally getting to my question:
> - Local spark server (good)
> - Local kafka server and messages getting produced (good)
> - Trying to this line of PySpark code (not good)
>
> # Setup connection to Kafka Stream dns_events = 
> spark.readStream.format('kafka')\
>   .option('kafka.bootstrap.servers', 'localhost:9092')\
>   .option('subscribe', 'dns')\
>   .option('startingOffsets', 'latest')\
>   .load()
>
>
> fails with:
> java.lang.ClassNotFoundException: Failed to find data source: kafka.
> Please find packages at http://spark.apache.org/third-party-projects.html
>
> I've looked that the URL listed... and poking around I can see that maybe
> I need the kafka jar file as part of my local server.
>
> I lamely tried this:
> $ spark-submit --packages org.apache.spark:spark-sql-
> kafka-0-10_2.11:2.2.0
>
> Exception in thread "main" java.lang.IllegalArgumentException: Missing
> application resource. at org.apache.spark.launcher.CommandBuilderUtils.
> checkArgument(CommandBuilderUtils.java:241) at org.apache.spark.launcher.
> SparkSubmitCommandBuilder.buildSparkSubmitArgs(SparkSubmitCommandBuilder.java:160)
> at org.apache.spark.launcher.SparkSubmitCommandBuilder.
> buildSparkSubmitCommand(SparkSubmitCommandBuilder.java:274) at
> org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(
> SparkSubmitCommandBuilder.java:151) at org.apache.spark.launcher.
> Main.main(Main.java:86)
>
>
> Anyway, all my code/versions/etc are in this notebook:
> - https://github.com/Kitware/BroThon/blob/master/notebooks/
> Bro_to_Spark.ipynb
>
> I'd be tremendously appreciative of some super nice, smart person if they
> could point me in the right direction :)
>
> -Brian Wylie
>


Re: Joining 2 dataframes, getting result as nested list/structure in dataframe

2017-08-23 Thread Michael Armbrust
You can create a nested struct that contains multiple columns using
struct().

Here's a pretty complete guide on working with nested data:
https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

On Wed, Aug 23, 2017 at 2:30 PM, JG Perrin  wrote:

> Hi folks,
>
>
>
> I am trying to join 2 dataframes, but I would like to have the result as a
> list of rows of the right dataframe (dDf in the example) in a column of the
> left dataframe (cDf in the example). I made it work with *one column*,
> but having issues adding more columns/creating a row(?).
>
> Seq joinColumns = new Set2<>("c1", "c2").toSeq();
>
> Dataset allDf = cDf.join(dDf, joinColumns, "inner");
>
> allDf.printSchema();
>
> allDf.show();
>
>
>
> Dataset aggDf = allDf.groupBy(cDf.col("c1"), cDf.col("c2"))
>
> .agg(collect_list(col("c50")));
>
> aggDf.show();
>
>
>
> Output:
>
> ++---+---+
>
> |c1  |c2 |collect_list(c50)  |
>
> ++---+---+
>
> |3744|1160242| [6, 5, 4, 3, 2, 1]|
>
> |3739|1150097|[1]|
>
> |3780|1159902|[5, 4, 3, 2, 1]|
>
> | 132|1200743|   [4, 3, 2, 1]|
>
> |3778|1183204|[1]|
>
> |3766|1132709|[1]|
>
> |3835|1146169|[1]|
>
> ++---+---+
>
>
>
> Thanks,
>
>
>
> jg
>
>
> --
>
> This electronic transmission and any documents accompanying this
> electronic transmission contain confidential information belonging to the
> sender. This information may contain confidential health information that
> is legally privileged. The information is intended only for the use of the
> individual or entity named above. The authorized recipient of this
> transmission is prohibited from disclosing this information to any other
> party unless required to do so by law or regulation and is required to
> delete or destroy the information after its stated need has been fulfilled.
> If you are not the intended recipient, you are hereby notified that any
> disclosure, copying, distribution or the taking of any action in reliance
> on or regarding the contents of this electronically transmitted information
> is strictly prohibited. If you have received this E-mail in error, please
> notify the sender and delete this message immediately.
>


Re: Chaining Spark Streaming Jobs

2017-08-23 Thread Michael Armbrust
If you use structured streaming and the file sink, you can have a
subsequent stream read using the file source.  This will maintain exactly
once processing even if there are hiccups or failures.

On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
wrote:

> Hello Spark Experts,
>
> I have a design question w.r.t Spark Streaming. I have a streaming job
> that consumes protocol buffer encoded real time logs from a Kafka cluster
> on premise. My spark application runs on EMR (aws) and persists data onto
> s3. Before I persist, I need to strip header and convert protobuffer to
> parquet (I use sparksql-scalapb to convert from Protobuff to
> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
> enrichment on the same dataframe after persisting the raw data, however, in
> order to modularize I am planning to have a separate job which picks up the
> raw data and performs enrichment on it. Also,  I am trying to avoid all in
> 1 job as the enrichments could get project specific while raw data
> persistence stays customer/project agnostic.The enriched data is allowed to
> have some latency (few minutes)
>
> My challenge is, after persisting the raw data, how do I chain the next
> streaming job. The only way I can think of is -  job 1 (raw data)
> partitions on current date (MMDD) and within current date, the job 2
> (enrichment job) filters for records within 60s of current time and
> performs enrichment on it in 60s batches.
> Is this a good option? It seems to be error prone. When either of the jobs
> get delayed due to bursts or any error/exception this could lead to huge
> data losses and non-deterministic behavior . What are other alternatives to
> this?
>
> Appreciate any guidance in this regard.
>
> regards
> Sunita Koppar
>


PySpark, Structured Streaming and Kafka

2017-08-23 Thread Brian Wylie
Hi All,

I'm trying the new hotness of using Kafka and Structured Streaming.

Resources that I've looked at
- https://spark.apache.org/docs/latest/streaming-programming-guide.html
- https://databricks.com/blog/2016/07/28/structured-streaming-
in-apache-spark.html
- https://spark.apache.org/docs/latest/streaming-custom-receivers.html
- http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Stru
ctured%20Streaming%20using%20Python%20DataFrames%20API.html

My setup is a bit weird (yes.. yes.. I know...)
- Eventually I'll just use a DataBricks cluster and life will be bliss :)
- But for now I want to test/try stuff out on my little Mac Laptop

The newest version of PySpark will install a local Spark server with a
simple:
$ pip install pyspark

This is very nice. I've put together a little notebook using that kewl
feature:
-
https://github.com/Kitware/BroThon/blob/master/notebooks/Bro_to_Spark_Cheesy.ipynb

So the next step is the setup/use a Kafka message queue and that went
well/works fine.

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns

*I get messages spitting out*

{"ts":1503513688.232274,"uid":"CdA64S2Z6Xh555","id.orig_h":"192.168.1.7","id.orig_p":58528,"id.resp_h":"192.168.1.1","id.resp_p":53,"proto":"udp","trans_id":43933,"rtt":0.02226,"query":"brian.wylie.is.awesome.tk","qclass":1,"qclass_name":"C_INTERNET","qtype":1,"qtype_name":"A","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["17.188.137.55","17.188.142.54","17.188.138.55","17.188.141.184","17.188.129.50","17.188.128.178","17.188.129.178","17.188.141.56"],"TTLs":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],"rejected":false}


Okay, finally getting to my question:
- Local spark server (good)
- Local kafka server and messages getting produced (good)
- Trying to this line of PySpark code (not good)

# Setup connection to Kafka Stream dns_events =
spark.readStream.format('kafka')\
  .option('kafka.bootstrap.servers', 'localhost:9092')\
  .option('subscribe', 'dns')\
  .option('startingOffsets', 'latest')\
  .load()


fails with:
java.lang.ClassNotFoundException: Failed to find data source: kafka. Please
find packages at http://spark.apache.org/third-party-projects.html

I've looked that the URL listed... and poking around I can see that maybe I
need the kafka jar file as part of my local server.

I lamely tried this:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

Exception in thread "main" java.lang.IllegalArgumentException: Missing
application resource. at
org.apache.spark.launcher.CommandBuilderUtils.checkArgument(CommandBuilderUtils.java:241)
at
org.apache.spark.launcher.SparkSubmitCommandBuilder.buildSparkSubmitArgs(SparkSubmitCommandBuilder.java:160)
at
org.apache.spark.launcher.SparkSubmitCommandBuilder.buildSparkSubmitCommand(SparkSubmitCommandBuilder.java:274)
at
org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(SparkSubmitCommandBuilder.java:151)
at org.apache.spark.launcher.Main.main(Main.java:86)


Anyway, all my code/versions/etc are in this notebook:
-
https://github.com/Kitware/BroThon/blob/master/notebooks/Bro_to_Spark.ipynb

I'd be tremendously appreciative of some super nice, smart person if they
could point me in the right direction :)

-Brian Wylie


ReduceByKeyAndWindow checkpoint recovery issues in Spark Streaming

2017-08-23 Thread SRK
Hi,

ReduceByKeyAndWindow checkpoint recovery has issues when trying to recover
for the second time. Basically it is losing the reduced value of the
previous window but is present in the old values that needs to be inverse
reduced resulting in the following error. Does anyone has any idea as to why
it does not recover properly the second time?


Neither previous window has value for key, nor new values found. Are you
sure your key class hashes consistently?
at
org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:143)
at
org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:130)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ReduceByKeyAndWindow-checkpoint-recovery-issues-in-Spark-Streaming-tp29100.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Joining 2 dataframes, getting result as nested list/structure in dataframe

2017-08-23 Thread JG Perrin
Hi folks,

I am trying to join 2 dataframes, but I would like to have the result as a list 
of rows of the right dataframe (dDf in the example) in a column of the left 
dataframe (cDf in the example). I made it work with one column, but having 
issues adding more columns/creating a row(?).
Seq joinColumns = new Set2<>("c1", "c2").toSeq();
Dataset allDf = cDf.join(dDf, joinColumns, "inner");
allDf.printSchema();
allDf.show();

Dataset aggDf = allDf.groupBy(cDf.col("c1"), cDf.col("c2"))
.agg(collect_list(col("c50")));
aggDf.show();

Output:
++---+---+
|c1  |c2 |collect_list(c50)  |
++---+---+
|3744|1160242| [6, 5, 4, 3, 2, 1]|
|3739|1150097|[1]|
|3780|1159902|[5, 4, 3, 2, 1]|
| 132|1200743|   [4, 3, 2, 1]|
|3778|1183204|[1]|
|3766|1132709|[1]|
|3835|1146169|[1]|
++---+---+

Thanks,

jg

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.


Re: Training A ML Model on a Huge Dataframe

2017-08-23 Thread Sea aj
Thanks for the reply.

As far as I understood mini batch is not yet supported in ML libarary. As
for MLLib minibatch, I could not find any pyspark api.



 Sent with Mailtrack


On Wed, Aug 23, 2017 at 2:59 PM, Suzen, Mehmet  wrote:

> It depends on what model you would like to train but models requiring
> optimisation could use SGD with mini batches. See:
> https://spark.apache.org/docs/latest/mllib-optimization.
> html#stochastic-gradient-descent-sgd
>
> On 23 August 2017 at 14:27, Sea aj  wrote:
>
>> Hi,
>>
>> I am trying to feed a huge dataframe to a ml algorithm in Spark but it
>> crashes due to the shortage of memory.
>>
>> Is there a way to train the model on a subset of the data in multiple
>> steps?
>>
>> Thanks
>>
>>
>>
>>  Sent with Mailtrack
>> 
>>
>
>
>
> --
>
> Mehmet Süzen, MSc, PhD
> 
>
> | PRIVILEGED AND CONFIDENTIAL COMMUNICATION This e-mail transmission, and
> any documents, files or previous e-mail messages attached to it, may
> contain confidential information that is legally privileged. If you are not
> the intended recipient or a person responsible for delivering it to the
> intended recipient, you are hereby notified that any disclosure, copying,
> distribution or use of any of the information contained in or attached to
> this transmission is STRICTLY PROHIBITED within the applicable law. If you
> have received this transmission in error, please: (1) immediately notify me
> by reply e-mail to su...@acm.org,  and (2) destroy the original
> transmission and its attachments without reading or saving in any manner. |
>


filter function works incorretly (Python)

2017-08-23 Thread AlexanderModestov
Hello All!
I'm trying to filter some rows in my DataFrame.
I created a list with ids and I use the construction:
df_new = df.filter(df.user.isin(list_users))
The first (df) DataFrame consists on 29711562 rows but the new one -
5394805.
OK, I've decided to use another one method:
df_new = df.join(df_ids, df.user==df_ids.user, how='inner').
df_ids is a dataframe where in rows ids (ids are unique). And I wanted to
find a common part of ids according to this method but again I got a new
dataframe which is bigger the previous one.
May be someone knows the right answer how to implement this in a right way?
Thank you!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/filter-function-works-incorretly-Python-tp29099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Training A ML Model on a Huge Dataframe

2017-08-23 Thread Suzen, Mehmet
It depends on what model you would like to train but models requiring
optimisation could use SGD with mini batches. See:
https://spark.apache.org/docs/latest/mllib-optimization.html#stochastic-gradient-descent-sgd

On 23 August 2017 at 14:27, Sea aj  wrote:

> Hi,
>
> I am trying to feed a huge dataframe to a ml algorithm in Spark but it
> crashes due to the shortage of memory.
>
> Is there a way to train the model on a subset of the data in multiple
> steps?
>
> Thanks
>
>
>
>  Sent with Mailtrack
> 
>



-- 

Mehmet Süzen, MSc, PhD


| PRIVILEGED AND CONFIDENTIAL COMMUNICATION This e-mail transmission, and
any documents, files or previous e-mail messages attached to it, may
contain confidential information that is legally privileged. If you are not
the intended recipient or a person responsible for delivering it to the
intended recipient, you are hereby notified that any disclosure, copying,
distribution or use of any of the information contained in or attached to
this transmission is STRICTLY PROHIBITED within the applicable law. If you
have received this transmission in error, please: (1) immediately notify me
by reply e-mail to su...@acm.org,  and (2) destroy the original
transmission and its attachments without reading or saving in any manner. |


Training A ML Model on a Huge Dataframe

2017-08-23 Thread Sea aj
Hi,

I am trying to feed a huge dataframe to a ml algorithm in Spark but it
crashes due to the shortage of memory.

Is there a way to train the model on a subset of the data in multiple steps?

Thanks



 Sent with Mailtrack



RE: A bug in spark or hadoop RPC with kerberos authentication?

2017-08-23 Thread Sun, Keith
Thanks for the reply, I filled an issue in JIRA 
https://issues.apache.org/jira/browse/SPARK-21819

I submitted the job from Java API, not by the spark-submit command line as we 
want to make spark processing as a service .

Configuration hc = new  Configuration(false);
String yarnxml=String.format("%s/%s", 
ConfigLocation,"yarn-site.xml");
String corexml=String.format("%s/%s", 
ConfigLocation,"core-site.xml");
String hdfsxml=String.format("%s/%s", 
ConfigLocation,"hdfs-site.xml");
String hivexml=String.format("%s/%s", 
ConfigLocation,"hive-site.xml");

hc.addResource(yarnxml);
hc.addResource(corexml);
hc.addResource(hdfsxml);
hc.addResource(hivexml);

//manually set all the Hadoop config in sparkconf
SparkConf sc = new SparkConf(true);
hc.forEach(entry-> {
 if(entry.getKey().startsWith("hive")) {
   sc.set(entry.getKey(), 
entry.getValue());
 }else {
   
sc.set("spark.hadoop."+entry.getKey(), entry.getValue());
 }
   });

  UserGroupInformation.setConfiguration(hc);
  UserGroupInformation.loginUserFromKeytab(Principal, Keytab);

SparkSession sparkSessesion= SparkSession
 .builder()
 .master("yarn-client") 
//"yarn-client", "local"
 .config(sc)
 .appName(SparkEAZDebug.class.getName())
 .enableHiveSupport()
 .getOrCreate();


Thanks very much.
Keith

From: 周康 [mailto:zhoukang199...@gmail.com]
Sent: 2017年8月22日 20:22
To: Sun, Keith 
Cc: user@spark.apache.org
Subject: Re: A bug in spark or hadoop RPC with kerberos authentication?

you can checkout Hadoop**credential class in  spark yarn。During spark submit,it 
will use config on the classpath.
I wonder how do you reference your own config?


RE: A bug in spark or hadoop RPC with kerberos authentication?

2017-08-23 Thread Sun, Keith
Finally find the root cause and raise a bug issue in 
https://issues.apache.org/jira/browse/SPARK-21819



Thanks very much.
Keith

From: Sun, Keith
Sent: 2017年8月22日 8:48
To: user@spark.apache.org
Subject: A bug in spark or hadoop RPC with kerberos authentication?

Hello ,

I met this very weird issue, while easy to reproduce, and stuck me for more 
than 1 day .I suspect this may be an issue/bug related to the class loader.
Can you help confirm the root cause ?

I want to specify a customized Hadoop configuration set instead of those on the 
class path(we have a few hadoop clusters and all have Kerberos security and I 
want to support different configuration).
Code/error like below.


The work around I found is to place a core-site.xml on the class path with 
below 2 properties will work.
By checking  the rpc code under org.apache.hadoop.ipc.RPC, I suspect the RPC 
code may not see the UGI class in the same classloader.
So UGI is initialized with default value on the classpth which is simple 
authentication.

core-site.xml with the security setup on the classpath:


hadoop.security.authentication
 kerberos


hadoop.security.authorization
true




error--
2673 [main] DEBUG 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil  - 
DataTransferProtocol using SaslPropertiesResolver, configured QOP 
dfs.data.transfer.protection = privacy, configured class 
dfs.data.transfer.saslproperties.resolver.class = class 
org.apache.hadoop.security.WhitelistBasedResolver
2696 [main] DEBUG org.apache.hadoop.service.AbstractService  - Service: 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state INITED
2744 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - 
PrivilegedAction as:x@xxxCOM (auth:KERBEROS) 
from:org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:136) //
2746 [main] DEBUG org.apache.hadoop.yarn.ipc.YarnRPC  - Creating YarnRPC for 
org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
2746 [main] DEBUG org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC  - Creating a 
HadoopYarnProtoRpc proxy for protocol interface 
org.apache.hadoop.yarn.api.ApplicationClientProtocol
2801 [main] DEBUG org.apache.hadoop.ipc.Client  - getting client out of cache: 
org.apache.hadoop.ipc.Client@748fe51d
2981 [main] DEBUG org.apache.hadoop.service.AbstractService  - Service 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl is started
3004 [main] DEBUG org.apache.hadoop.ipc.Client  - The ping interval is 6 ms.
3005 [main] DEBUG org.apache.hadoop.ipc.Client  - Connecting to 
yarn-rm-1/x:8032
3019 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - IPC Client (2012095985) 
connection to yarn-rm-1/x:8032 from x@xx: starting, having 
connections 1
3020 [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client  - 
IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx 
sending #0
3025 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - IPC Client (2012095985) 
connection to yarn-rm-1/x:8032 from x@xx got value #-1
3026 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - closing ipc connection to 
yarn-rm-1/x:8032: SIMPLE authentication is not enabled.  Available:[TOKEN, 
KERBEROS]
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
 SIMPLE authentication is not enabled.  Available:[TOKEN, KERBEROS]
at 
org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1131)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979)
---code---

  Configuration hc = new  Configuration(false);

  hc.addResource("myconf /yarn-site.xml");
  hc.addResource("myconf/core-site.xml");
  hc.addResource("myconf/hdfs-site.xml");
  hc.addResource("myconf/hive-site.xml");

  SparkConf sc = new SparkConf(true);
  // add config in spark conf as no xml in the classpath except those 
“default.xml” from Hadoop jars.
  hc.forEach(entry-> {
if(entry.getKey().startsWith("hive")) {
sc.set(entry.getKey(), entry.getValue());
}else {
sc.set("spark.hadoop."+entry.getKey(), entry.getValue());
}
 });

   UserGroupInformation.setConfiguration(hc);
   UserGroupInformation.loginUserFromKeytab(Principal, Keytab);

  System.out.println("spark-conf##");
  System.out.println(sc.toDebugString());


  SparkSession sparkSessesion= SparkSession
.builder()
.master("yarn-client") //"yarn-client", "local"
.config(sc)
.appName(SparkEAZDebug.class.getName())
.enableH

Spark billing on shared Clusters

2017-08-23 Thread Jorge Machado

Hi everyone, 

I was wondering how it is possible to do Spark / Yarn  accounting on a shared 
cluster based on resource usage. I found out that is no way to do that. 

So I develop hbilling to deal with this. Is someone interested on a quick demo 
or so ? 

More info under: www.hbilling.io  

Example: 

Dom is an enterprise architect on a company SuperX. SuperX has a new hadoop 
cluster up and running with four departments sharing the hardware. Now Dom 
want's to charge each department by cluster usage. After some research he 
founds out that hbilling is the first Software that addresses exact this 
problem.



Best Regards

Jorge Machado
www.jmachado.me 
jo...@jmachado.me