Re: Apache Spark MLIB

2017-02-24 Thread Jon Gregg
Here's a high level overview of Spark's ML Pipelines around when it came
out: https://www.youtube.com/watch?v=OednhGRp938.

But reading your description, you might be able to build a basic version of
this without ML.  Spark has broadcast variables
<http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables>
that
would allow you to put flagged IP ranges into an array and make that
available on every node.  Then you can filters to detect users who've
logged in from a flagged IP range.

Jon Gregg

On Thu, Feb 23, 2017 at 9:19 PM, Mina Aslani <aslanim...@gmail.com> wrote:

> Hi,
>
> I am going to start working on anomaly detection using Spark MLIB. Please
> note that I have not used Spark so far.
>
> I would like to read some data and if a user logged in from different ip
> address which is not common consider it as an anomaly, similar to what
> apple/google does.
>
> My preferred language of programming is JAVA.
>
> I am wondering if you can let me know about books/workshops which guide me
> on the ML algorithm to use and how to implement. I would like to know about
> the Spark supervised/unsupervised options and the suggested algorithm.
>
> I really appreciate if you share you thoughts/experience/insight with me.
>
> Best regards,
> Mina
>


Re: Spark executors in streaming app always uses 2 executors

2017-02-22 Thread Jon Gregg
Spark offers a receiver-based approach or direct approach with Kafka (
https://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html),
and a note in the receiver-based approach says "topic partitions in Kafka
does correlate to partitions of RDDs generated in Spark Streaming."

A fix might be as simple as switching to the direct approach
<https://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers>
?

Jon Gregg

On Wed, Feb 22, 2017 at 12:37 AM, satishl <satish.la...@gmail.com> wrote:

> I am reading from a kafka topic which has 8 partitions. My spark app is
> given
> 40 executors (1 core per executor). After reading the data, I repartition
> the dstream by 500, map it and save it to cassandra.
> However, I see that only 2 executors are being used per batch. even though
> I
> see 500 tasks for the stage all of them are sequentially scheduled on the 2
> executors picked. My spark concepts are still forming and I missing
> something obvious.
> I expected that 8 executors will be picked for reading data from the 8
> partitions in kafka, and then with the repartition this data will be
> distributed between 40 executors and then saved to cassandra.
> How should I think about this?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-executors-in-streaming-app-
> always-uses-2-executors-tp28413.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Query data in subdirectories in Hive Partitions using Spark SQL

2017-02-18 Thread Jon Gregg
Spark has partition discovery if your data is laid out in a
parquet-friendly directory structure:
http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery

You can also use wildcards to get subdirectories (I'm using spark 1.6 here)
>>
data2 = sqlContext.read.load("/my/data/parquetTable/*", "parquet") # gets
all subdirectories
>>

<http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery>Another
option would be to CREATE a Hive table on top of your data that uses
PARTITIONED BY to identify the subdirectories, and then use Spark SQL to
query that Hive table.  There might be a cleaner way to do this in Spark
2.0+ but that is a common pattern for me in Spark 1.6 when I know the
directory structure but don't have "=" signs in the paths.

Jon Gregg

On Fri, Feb 17, 2017 at 7:02 PM, 颜发才(Yan Facai) <facai@gmail.com> wrote:

> Hi, Abdelfatah,
> How to you read these files? spark.read.parquet or spark.sql?
> Could you show some code?
>
>
> On Wed, Feb 15, 2017 at 8:47 PM, Ahmed Kamal Abdelfatah <
> ahmed.abdelfa...@careem.com> wrote:
>
>> Hi folks,
>>
>>
>>
>> How can I force spark sql to recursively get data stored in parquet
>> format from subdirectories ?  In Hive, I could achieve this by setting few
>> Hive configs.
>>
>>
>>
>> set hive.input.dir.recursive=true;
>>
>> set hive.mapred.supports.subdirectories=true;
>>
>> set hive.supports.subdirectories=true;
>>
>> set mapred.input.dir.recursive=true;
>>
>>
>>
>> I tried to set these configs through spark sql queries but I get 0
>> records all the times compared to hive which get me the expected results. I
>> also put these confs in hive-site.xml file but nothing changed. How can I
>> handle this issue ?
>>
>>
>>
>> Spark Version : 2.1.0
>>
>> I used Hive 2.1.1  on emr-5.3.1
>>
>>
>>
>> *Regards, *
>>
>>
>>
>>
>> *Ahmed Kamal*
>> *MTS in Data Science*
>>
>> *Email: **ahmed.abdelfa...@careem.com <ahmed.abdelfa...@careem.com>*
>>
>>
>>
>>
>>
>
>


Re: skewed data in join

2017-02-17 Thread Jon Gregg
It depends how you salt it.  See slide 40 and onwards from a spark summit
talk here: http://www.slideshare.net/cloudera/top-5-mistakes-
to-avoid-when-writing-apache-spark-applications  The speakers use a mod8
integer salt appended to the end of the key, the salt that works best for
you might be different.

On Thu, Feb 16, 2017 at 12:40 PM, Gourav Sengupta  wrote:

> Hi,
>
> Thanks for your kind response. The hash key using random numbers increases
> the time for processing the data. My entire join for the entire month
> finishes within 150 seconds for 471 million records and then stays for
> another 6 mins for 55 million records.
>
> Using hash keys increases the processing time to 11 mins. Therefore I am
> not quite clear why should I do that. The overall idea was to ensure that
> the entire processing of around 520 million records in may be another 10
> seconds more.
>
>
>
> Regards,
> Gourav Sengupta
>
> On Thu, Feb 16, 2017 at 4:54 PM, Anis Nasir  wrote:
>
>> You can also so something similar to what is mentioned in [1].
>>
>> The basic idea is to use two hash functions for each key and assigning it
>> to the least loaded of the two hashed worker.
>>
>> Cheers,
>> Anis
>>
>>
>> [1]. https://melmeric.files.wordpress.com/2014/11/the-power-of-
>> both-choices-practical-load-balancing-for-distributed-
>> stream-processing-engines.pdf
>>
>>
>> On Fri, 17 Feb 2017 at 01:34, Yong Zhang  wrote:
>>
>>> Yes. You have to change your key, or as BigData term, "adding salt".
>>>
>>>
>>> Yong
>>>
>>> --
>>> *From:* Gourav Sengupta 
>>> *Sent:* Thursday, February 16, 2017 11:11 AM
>>> *To:* user
>>> *Subject:* skewed data in join
>>>
>>> Hi,
>>>
>>> Is there a way to do multiple reducers for joining on skewed data?
>>>
>>> Regards,
>>> Gourav
>>>
>>
>


Re: notebook connecting Spark On Yarn

2017-02-15 Thread Jon Gregg
Could you just make Hadoop's resource manager (port 8088) available to your
users, and they can check available containers that way if they see the
launch is stalling?

Another option is to reduce the default # of executors and memory per
executor in the launch script to some small fraction of your cluster size,
and make it so users can manually ask for more if they need to.  It doesn't
take a whole lot of workers/memory to build most of your spark code off a
sample.

Jon

On Wed, Feb 15, 2017 at 6:41 AM, Sachin Aggarwal  wrote:

> Hi,
>
> I am trying to create multiple notebooks connecting to spark on yarn.
> After starting few jobs my cluster went out of containers. All new notebook
> request are in busy state as Jupyter kernel gateway is not getting any
> containers for master to be started.
>
> Some job are not leaving the containers for approx 10-15 mins. so user is
> not able to figure out what is wrong, why his kernel is still in busy state
>
> Is there any property or hack by which I can return valid response to
> users that there are no containers left.
>
> can I label/mark few containers for master equal to max kernel execution I
> am allowing in my cluster. so that if new kernel starts he will at least
> one container for master. it can be dynamic on priority based. if there is
> no container left then yarn can preempt some containers and provide them to
> new requests.
>
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772
>


Re: Order of rows not preserved after cache + count + coalesce

2017-02-13 Thread Jon Gregg
Spark has a zipWithIndex function for RDDs (
http://stackoverflow.com/a/26081548) that adds an index column right after
you create an RDD, and I believe it preserves order.  Then you can sort it
by the index after the cache step.

I haven't tried this with a Dataframe but this answer seems promising:
http://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex



On Mon, Feb 13, 2017 at 8:34 AM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> RDDs and DataFrames do not guarantee any specific ordering of data. They
> are like tables in a SQL database. The only way to get a guaranteed
> ordering of rows is to explicitly specify an orderBy() clause in your
> statement. Any ordering you see otherwise is incidental.
> ​
>
> On Mon, Feb 13, 2017 at 7:52 AM David Haglund (external) <
> david.hagl...@husqvarnagroup.com> wrote:
>
>> Hi,
>>
>>
>>
>> I found something that surprised me, I expected the order of the rows to
>> be preserved, so I suspect this might be a bug. The problem is illustrated
>> with the Python example below:
>>
>>
>>
>> In [1]:
>>
>> df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
>>
>> df.cache()
>>
>> df.count()
>>
>> df.coalesce(2).rdd.glom().collect()
>>
>> Out[1]:
>>
>> [[Row(n=1)], [Row(n=0), Row(n=2)]]
>>
>>
>>
>> Note how n=1 comes before n=0, above.
>>
>>
>>
>>
>>
>> If I remove the cache line I get the rows in the correct order and the
>> same if I use df.rdd.count() instead of df.count(), see examples below:
>>
>>
>>
>> In [2]:
>>
>> df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
>>
>> df.count()
>>
>> df.coalesce(2).rdd.glom().collect()
>>
>> Out[2]:
>>
>> [[Row(n=0)], [Row(n=1), Row(n=2)]]
>>
>>
>>
>> In [3]:
>>
>> df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
>>
>> df.cache()
>>
>> df.rdd.count()
>>
>> df.coalesce(2).rdd.glom().collect()
>>
>> Out[3]:
>>
>> [[Row(n=0)], [Row(n=1), Row(n=2)]]
>>
>>
>>
>>
>>
>> I use spark 2.1.0 and pyspark.
>>
>>
>>
>> Regards,
>>
>> /David
>>
>> The information in this email may be confidential and/or legally
>> privileged. It has been sent for the sole use of the intended recipient(s).
>> If you are not an intended recipient, you are strictly prohibited from
>> reading, disclosing, distributing, copying or using this email or any of
>> its contents, in any way whatsoever. If you have received this email in
>> error, please contact the sender by reply email and destroy all copies of
>> the original message. Please also be advised that emails are not a secure
>> form for communication, and may contain errors.
>>
>


Re: Lost executor 4 Container killed by YARN for exceeding memory limits.

2017-02-13 Thread Jon Gregg
Setting Spark's memoryOverhead configuration variable is recommended in
your logs, and has helped me with these issues in the past.  Search for
"memoryOverhead" here:
http://spark.apache.org/docs/latest/running-on-yarn.html

That said, you're running on a huge cluster as it is.  If it's possible to
filter your tables down before the join (keeping just the rows/columns you
need), that may be a better solution.

Jon

On Mon, Feb 13, 2017 at 5:27 AM, nancy henry 
wrote:

> Hi All,,
>
> I am getting below error while I am trying to join 3 tables which are in
> ORC format in hive from 5 10gb tables through hive context in spark
>
> Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB
> physical memory used. Consider boosting spark.yarn.executor.
> memoryOverhead.
> 17/02/13 02:21:19 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Container killed by YARN for exceeding memory limits. 11.1 GB of 11 GB
> physical memory used
>
>
> I am using below memory parameters to launch shell .. what else i could
> increase from these parameters or do I need to change any configuration
> settings please let me know
>
> spark-shell --master yarn --deploy-mode client --driver-memory 16G
> --num-executors 500 executor-cores 7 --executor-memory 10G
>
>


Re: does persistence required for single action ?

2017-02-08 Thread Jon Gregg
Hard to say without more context around where your job is stalling, what
file sizes you're working with etc.

Best answer would be to test and see, but in general for simple DAGs, I
find that not persisting anything typically runs the fastest. If I persist
anything it would be rdd6 because it took some processing to create and I
might want to use rdd6 for more analyses in the future.

Jon

On Wed, Feb 8, 2017 at 1:40 AM, Jörn Franke  wrote:

> Depends on the use case, but a persist before checkpointing can make sense
> after some of the map steps.
>
> On 8 Feb 2017, at 03:09, Shushant Arora  wrote:
>
> Hi
>
> I have a workflow like below:
>
> rdd1 = sc.textFile(input);
> rdd2 = rdd1.filter(filterfunc1);
> rdd3 = rdd1.filter(fiterfunc2);
> rdd4 = rdd2.map(mapptrans1);
> rdd5 = rdd3.map(maptrans2);
> rdd6 = rdd4.union(rdd5);
> rdd6.foreach(some transformation);
>
> 
>
>
>
>
>1. Do I need to persist rdd1 ?Or its not required since there is only
>one action at rdd6 which will create only one job and in a single job no
>need of persist ?
>2. Also what if transformation on rdd2 is reduceByKey instead of map ?
>Will this again the same thing no need of persist since single job.
>
>
> Thanks
>
>


Re: wholeTextFiles fails, but textFile succeeds for same path

2017-02-06 Thread Jon Gregg
Strange that it's working for some directories but not others.  Looks like
wholeTextFiles maybe doesn't work with S3?
https://issues.apache.org/jira/browse/SPARK-4414 .

If it's possible to load the data into EMR and run Spark from there that
may be a workaround.  This blogspot shows a python workaround that might
work as well:
http://michaelryanbell.com/processing-whole-files-spark-s3.html

Jon


On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay 
wrote:

> I've actually been able to trace the problem to the files being read in.
> If I change to a different directory, then I don't get the error. Is one of
> the executors running out of memory?
>
>
>
>
>
> On 02/06/2017 02:35 PM, Paul Tremblay wrote:
>
>> When I try to create an rdd using wholeTextFiles, I get an
>> incomprehensible error. But when I use the same path with sc.textFile, I
>> get no error.
>>
>> I am using pyspark with spark 2.1.
>>
>> in_path = 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/148069
>> 8542939.6/warc/
>>
>> rdd = sc.wholeTextFiles(in_path)
>>
>> rdd.take(1)
>>
>>
>> /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
>>1341
>>1342 p = range(partsScanned, min(partsScanned +
>> numPartsToTry, totalParts))
>> -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
>>1344
>>1345 items += res
>>
>> /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd,
>> partitionFunc, partitions, allowLocal)
>> 963 # SparkContext#runJob.
>> 964 mappedRDD = rdd.mapPartitions(partitionFunc)
>> --> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(),
>> mappedRDD._jrdd, partitions)
>> 966 return list(_load_from_socket(port,
>> mappedRDD._jrdd_deserializer))
>> 967
>>
>> /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in
>> __call__(self, *args)
>>1131 answer = self.gateway_client.send_command(command)
>>1132 return_value = get_return_value(
>> -> 1133 answer, self.gateway_client, self.target_id,
>> self.name)
>>1134
>>1135 for temp_arg in temp_args:
>>
>> /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
>>  61 def deco(*a, **kw):
>>  62 try:
>> ---> 63 return f(*a, **kw)
>>  64 except py4j.protocol.Py4JJavaError as e:
>>  65 s = e.java_exception.toString()
>>
>> /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in
>> get_return_value(answer, gateway_client, target_id, name)
>> 317 raise Py4JJavaError(
>> 318 "An error occurred while calling
>> {0}{1}{2}.\n".
>> --> 319 format(target_id, ".", name), value)
>> 320 else:
>> 321 raise Py4JError(
>>
>> Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.runJob.
>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in
>> stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, executor
>> 8): ExecutorLostFailure (executor 8 exited caused by one of the running
>> tasks) Reason: Container marked as failed: 
>> container_1486415078210_0005_01_16
>> on host: ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52.
>> Diagnostics: Exception from container-launch.
>> Container id: container_1486415078210_0005_01_16
>> Exit code: 52
>> Stack trace: ExitCodeException exitCode=52:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
>> at org.apache.hadoop.util.Shell.run(Shell.java:479)
>> at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Sh
>> ell.java:773)
>> at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerEx
>> ecutor.launchContainer(DefaultContainerExecutor.java:212)
>> at org.apache.hadoop.yarn.server.nodemanager.containermanager.l
>> auncher.ContainerLaunch.call(ContainerLaunch.java:302)
>> at org.apache.hadoop.yarn.server.nodemanager.containermanager.l
>> auncher.ContainerLaunch.call(ContainerLaunch.java:82)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> rdd = sc.textFile(in_path)
>>
>> In [8]: rdd.take(1)
>> Out[8]: [u'WARC/1.0']
>>
>>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Cannot read Hive Views in Spark SQL

2017-02-06 Thread Jon Gregg
Confirming that Spark can read newly created views - I just created a test
view in HDFS and I was able to query it in Spark 1.5 immediately after
without a refresh.  Possibly an issue with your Spark-Hive connection?

Jon

On Sun, Feb 5, 2017 at 9:31 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi Khan,
>
> It didn't work in my case. used below code. View is already present in
> Hive but I cant read that in spark sql. Throwing exception that table not
> found
>
> sqlCtx.refreshTable("schema.hive_view")
>
>
> Thanks,
>
> Asmath
>
>
> On Sun, Feb 5, 2017 at 7:56 PM, vaquar khan  wrote:
>
>> Hi Ashmath,
>>
>> Try  refresh table
>>
>> // spark is an existing SparkSession
>> spark.catalog.refreshTable("my_table")
>>
>>
>> http://spark.apache.org/docs/latest/sql-programming-guide.ht
>> ml#metadata-refreshing
>>
>>
>>
>> Regards,
>> Vaquar khan
>>
>>
>>
>> On Sun, Feb 5, 2017 at 7:19 PM, KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a hive view which is basically set of select statements on some
>>> tables. I want to read the hive view and use hive builtin functions
>>> available in spark sql.
>>>
>>> I am not able to read that hive view in spark sql but can retreive data
>>> in hive shell.
>>>
>>> can't spark access hive views?
>>>
>>> Thanks,
>>> Asmath
>>>
>>
>>
>>
>> --
>> Regards,
>> Vaquar Khan
>> +1 -224-436-0783 <(224)%20436-0783>
>>
>> IT Architect / Lead Consultant
>> Greater Chicago
>>
>
>


Re: Starting a new Spark codebase, Python or Scala / Java?

2016-11-21 Thread Jon Gregg
Spark is written in Scala, so yes it's still the strongest option.  You
also get the Dataset type with Scala (compile time type-safety), and that's
not an available feature with Python.

That said, I think the Python API is a viable candidate if you use Pandas
for Data Science.  There are similarities between the DataFrame and Pandas
APIs, and you can convert a Spark DataFrame to a Pandas DataFrame.

On Mon, Nov 21, 2016 at 1:51 PM, Brandon White 
wrote:

> Hello all,
>
> I will be starting a new Spark codebase and I would like to get opinions
> on using Python over Scala. Historically, the Scala API has always been the
> strongest interface to Spark. Is this still true? Are there still many
> benefits and additional features in the Scala API that are not available in
> the Python API? Are there any performance concerns using the Python API
> that do not exist when using the Scala API? Anything else I should know
> about?
>
> I appreciate any insight you have on using the Scala API over the Python
> API.
>
> Brandon
>


Re: How do I access the nested field in a dataframe, spark Streaming app... Please help.

2016-11-20 Thread Jon Gregg
In these cases it might help to just flatten the DataFrame.  Here's a
helper function from the tutorial (scroll down to the "Flattening" header:
https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/02%20Introduction%20to%20DataFrames%20-%20scala.html



On Sun, Nov 20, 2016 at 1:24 PM, pandees waran  wrote:

> have you tried using "." access method?
>
> e.g:
> ds1.select("name","addresses[0].element.city")
>
> On Sun, Nov 20, 2016 at 9:59 AM, shyla deshpande  > wrote:
>
>> The following my dataframe schema
>>
>> root
>>  |-- name: string (nullable = true)
>>  |-- addresses: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- street: string (nullable = true)
>>  |||-- city: string (nullable = true)
>>
>> I want to output name and city. The following is my spark streaming app
>> which outputs name and addresses, but I want name and cities in the output.
>>
>> object PersonConsumer {
>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>   import com.example.protos.demo._
>>
>>   def main(args : Array[String]) {
>>
>> val spark = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>
>> import spark.implicits._
>>
>> val ds1 = spark.readStream.format("kafka").
>>   option("kafka.bootstrap.servers","localhost:9092").
>>   option("subscribe","person").load()
>>
>> val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value"
>> )).map(Person.parseFrom(_)).select($"name", $"addresses")
>>
>> ds2.printSchema()
>>
>> val query = ds2.writeStream
>>   .outputMode("append")
>>   .format("console")
>>   .start()
>>
>> query.awaitTermination()
>>   }
>> }
>>
>> Appreciate your help. Thanks.
>>
>
>
>
> --
> Thanks,
> Pandeeswaran
>


Re: Spark AVRO S3 read not working for partitioned data

2016-11-17 Thread Jon Gregg
Making a guess here: you need to add s3:ListBucket?

http://stackoverflow.com/questions/35803808/spark-saveastextfile-to-s3-fails


On Thu, Nov 17, 2016 at 2:11 PM, Jain, Nishit 
wrote:

> When I read a specific file it works:
>
> val filePath= "s3n://bucket_name/f1/f2/avro/dt=2016-10-19/hr=19/00"
> val df = spark.read.avro(filePath)
>
> But if I point to a folder to read date partitioned data it fails:
>
> val filePath="s3n://bucket_name/f1/f2/avro/dt=2016-10-19/"
>
> I get this error:
>
> Exception in thread "main" org.apache.hadoop.fs.s3.S3Exception: 
> org.jets3t.service.S3ServiceException: S3 HEAD request failed for 
> '/f1%2Ff2%2Favro%2Fdt%3D2016-10-19' - ResponseCode=403, 
> ResponseMessage=Forbidden
> at 
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleServiceException(Jets3tNativeFileSystemStore.java:245)
> at 
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:119)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at org.apache.hadoop.fs.s3native.$Proxy7.retrieveMetadata(Unknown Source)
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
> at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:374)
> at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.immutable.List.flatMap(List.scala:344)
> at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
> at 
> com.databricks.spark.avro.package$AvroDataFrameReader$$anonfun$avro$2.apply(package.scala:34)
> at 
> com.databricks.spark.avro.package$AvroDataFrameReader$$anonfun$avro$2.apply(package.scala:34)
> at BasicS3Avro$.main(BasicS3Avro.scala:55)
> at BasicS3Avro.main(BasicS3Avro.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>
>
> Am I missing anything?
>
>
>


Re: Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Jon Gregg
Since you're completely new to Kafka, I would start with the Kafka docs (
https://kafka.apache.org/documentation).  You should be able to get through
the Getting Started part easily and there are some examples for setting up
a basic Kafka server.

You don't need Kafka to start working with Spark Streaming (there are
examples online to pull directly from Twitter, for example).  But at a high
level if you're sending data from one server to another, it can be
beneficial to send the messages to a distributed queue first for durable
storage (so data doesn't get lost in transmission) and other benefits.

On Wed, Nov 16, 2016 at 2:12 PM, Mohammad Tariq  wrote:

> Hi Karim,
>
> Are you looking for something specific? Some information about your
> usecase would be really  helpful in order to answer your question.
>
>
> On Wednesday, November 16, 2016, Karim, Md. Rezaul <
> rezaul.ka...@insight-centre.org> wrote:
>
>> Hi All,
>>
>> I am completely new with Kafka. I was wondering if somebody could provide
>> me some guidelines on how to develop real-time streaming applications using
>> Spark Streaming API with Kafka.
>>
>> I am aware the Spark Streaming  and Kafka integration [1]. However, a
>> real life example should be better to start?
>>
>>
>>
>> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>
>>
>>
>>
>>
>> Regards,
>> _
>> *Md. Rezaul Karim* BSc, MSc
>> PhD Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>> 
>>
>
>
> --
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
>


Re: Newbie question - Best way to bootstrap with Spark

2016-11-14 Thread Jon Gregg
Piggybacking off this - how are you guys teaching DataFrames and Datasets
to new users?  I haven't taken the edx courses but I don't see Spark SQL
covered heavily in the syllabus.  I've dug through the Databricks
documentation but it's a lot of information for a new user I think - hoping
there is a video or course option instead.

On Mon, Nov 14, 2016 at 11:13 AM, Rishikesh Teke 
wrote:

> Integrate spark with apache zeppelin  https://zeppelin.apache.org/
>    its again a very handy way to bootstrap
> with spark.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Newbie-question-Best-way-to-bootstrap-with-Spark-
> tp28032p28069.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: A number of issues when running spark-ec2

2016-04-16 Thread Jon Gregg
That link points to hadoop2.6.tgz.  I tried changing the URL to
https://s3.amazonaws.com/spark-related-packages/spark-1.6.1-bin-hadoop2.7.tgz
and I get a NoSuchKey error.

Should I just go with it even though it says hadoop2.6?

On Sat, Apr 16, 2016 at 5:37 PM, Ted Yu  wrote:

> BTW this was the original thread:
>
> http://search-hadoop.com/m/q3RTt0Oxul0W6Ak
>
> The link for spark-1.6.1-bin-hadoop2.7 is
> https://s3.amazonaws.com/spark-related-packages/spark-1.6.1-bin-hadoop2.7.tgz
> 
>
> On Sat, Apr 16, 2016 at 2:14 PM, Ted Yu  wrote:
>
>> From the output you posted:
>> ---
>> Unpacking Spark
>>
>> gzip: stdin: not in gzip format
>> tar: Child returned status 1
>> tar: Error is not recoverable: exiting now
>> ---
>>
>> The artifact for spark-1.6.1-bin-hadoop2.6 is corrupt.
>>
>> This problem has been reported in other threads.
>>
>> Try spark-1.6.1-bin-hadoop2.7 - the artifact should be good.
>>
>> On Sat, Apr 16, 2016 at 2:09 PM, YaoPau  wrote:
>>
>>> I launched a cluster with: "./spark-ec2 --key-pair my_pem --identity-file
>>> ../../ssh/my_pem.pem launch jg_spark2" and I got the "Spark standalone
>>> cluster started at http://ec2-54-88-249-255.compute-1.amazonaws.com:8080
>>> "
>>> and "Done!" success confirmations at the end.  I confirmed on EC2 that 1
>>> Master and 1 Slave were both launched and passed their status checks.
>>>
>>> But none of the Spark commands seem to work (spark-shell, pyspark, etc),
>>> and
>>> port 8080 isn't being used.  The full output from launching the cluster
>>> is
>>> below.  Any ideas what the issue is?
>>>
>>> >>
>>> launch
>>>
>>> jg_spark2/Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/plugin.py:40:
>>> PendingDeprecationWarning: the imp module is deprecated in favour of
>>> importlib; see the module's documentation for alternative uses
>>>   import imp
>>>
>>> /Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/provider.py:197:
>>> ResourceWarning: unclosed file <_io.TextIOWrapper
>>> name='/Users/jg/.aws/credentials' mode='r' encoding='UTF-8'>
>>>   self.shared_credentials.load_from_path(shared_path)
>>> Setting up security groups...
>>> Creating security group jg_spark2-master
>>> Creating security group jg_spark2-slaves
>>> Searching for existing cluster jg_spark2 in region us-east-1...
>>> Spark AMI: ami-5bb18832
>>> Launching instances...
>>> Launched 1 slave in us-east-1a, regid = r-e7d97944
>>> Launched master in us-east-1a, regid = r-d3d87870
>>> Waiting for AWS to propagate instance metadata...
>>> Waiting for cluster to enter 'ssh-ready' state
>>>
>>> Warning: SSH connection error. (This could be temporary.)
>>> Host: ec2-54-88-249-255.compute-1.amazonaws.com
>>> SSH return code: 255
>>> SSH output: b'ssh: connect to host
>>> ec2-54-88-249-255.compute-1.amazonaws.com
>>> port 22: Connection refused'
>>>
>>>
>>> ./Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/connection.py:190:
>>> ResourceWarning: unclosed >> family=AddressFamily.AF_INET,
>>> type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.1.66', 55580),
>>> raddr=('54.239.20.1', 443)>
>>>   self.queue.pop(0)
>>>
>>>
>>> Warning: SSH connection error. (This could be temporary.)
>>> Host: ec2-54-88-249-255.compute-1.amazonaws.com
>>> SSH return code: 255
>>> SSH output: b'ssh: connect to host
>>> ec2-54-88-249-255.compute-1.amazonaws.com
>>> port 22: Connection refused'
>>>
>>>
>>> ./Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/connection.py:190:
>>> ResourceWarning: unclosed >> family=AddressFamily.AF_INET,
>>> type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.1.66', 55760),
>>> raddr=('54.239.26.182', 443)>
>>>   self.queue.pop(0)
>>>
>>>
>>> Warning: SSH connection error. (This could be temporary.)
>>> Host: ec2-54-88-249-255.compute-1.amazonaws.com
>>> SSH return code: 255
>>> SSH output: b'ssh: connect to host
>>> ec2-54-88-249-255.compute-1.amazonaws.com
>>> port 22: Connection refused'
>>>
>>>
>>> ./Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/connection.py:190:
>>> ResourceWarning: unclosed >> family=AddressFamily.AF_INET,
>>> type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.1.66', 55827),
>>> raddr=('54.239.20.1', 443)>
>>>   self.queue.pop(0)
>>>
>>>
>>> Warning: SSH connection error. (This could be temporary.)
>>> Host: ec2-54-88-249-255.compute-1.amazonaws.com
>>> SSH return code: 255
>>> SSH output: b'ssh: connect to host
>>> ec2-54-88-249-255.compute-1.amazonaws.com
>>> port 22: Connection refused'
>>>
>>>
>>> ./Users/jg/dev/spark-1.6.1-bin-hadoop2.6/ec2/lib/boto-2.34.0/boto/connection.py:190:
>>> ResourceWarning: unclosed >> family=AddressFamily.AF_INET,
>>> type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.1.66', 55925),
>>> raddr=('207.171.162.181', 443)>
>>>   self.queue.pop(0)
>>>
>>> Cluster is now in 

Re: Spark SQL 1.3 not finding attribute in DF

2015-12-07 Thread Jon Gregg
I'm working with a Hadoop distribution that doesn't support 1.5 yet, we'll
be able to upgrade in probably two months.  For now I'm seeing the same
issue with spark not recognizing an existing column name in many
hive-table-to-dataframe situations:

Py4JJavaError: An error occurred while calling o375.filter.
: org.apache.spark.sql.AnalysisException: resolved attributes *state_code*
missing from
latitude,country_code,tim_zone_desc,longitude,dma_durable_key,submarket,dma_
code,dma_desc,county,city,zip_code,*state_code*;

On Mon, Dec 7, 2015 at 3:52 PM, Davies Liu  wrote:

> Could you reproduce this problem in 1.5 or 1.6?
>
> On Sun, Dec 6, 2015 at 12:29 AM, YaoPau  wrote:
> > If anyone runs into the same issue, I found a workaround:
> >
>  df.where('state_code = "NY"')
> >
> > works for me.
> >
>  df.where(df.state_code == "NY").collect()
> >
> > fails with the error from the first post.
> >
> >
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-3-not-finding-attribute-in-DF-tp25599p25600.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Spark SQL "SELECT ... LIMIT" scans the entire Hive table?

2015-11-05 Thread Jon Gregg
Here's my code:

my_data = sqlCtx.sql("SELECT * FROM raw.site_activity_data LIMIT 2")
my_data.collect()

raw.site_activity_data is a Hive external table atop daily-partitioned
.gzip data.  When I execute the command I start seeing many of these pop up
in the logs (below is a small subset)

15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 718
15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 562
15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 261
15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 542
15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 272
15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 785
15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 748
15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 559
15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 543
15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 607
15/11/05 17:56:45 INFO FileInputFormat: Total input paths to process : 695
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 336
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 449
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 509
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 567
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 544
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 418
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 568
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 716
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 0
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 265
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 235
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 227
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 551
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 256
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 0
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 271
15/11/05 17:56:46 INFO FileInputFormat: Total input paths to process : 728

Then after that the Spark job starts executing 328,785 tasks.  Why doesn't
Spark SQL just look at one input path?

On Mon, Oct 5, 2015 at 5:35 PM, Michael Armbrust 
wrote:

> It does do a take.  Run explain to make sure that is the case.  Why do you
> think its reading the whole table?
>
> On Mon, Oct 5, 2015 at 1:53 PM, YaoPau  wrote:
>
>> I'm using SqlCtx connected to Hive in CDH 5.4.4.  When I run "SELECT *
>> FROM
>> my_db.my_tbl LIMIT 5", it scans the entire table like Hive would instead
>> of
>> doing a .take(5) on it and returning results immediately.
>>
>> Is there a way to get Spark SQL to use .take(5) instead of the Hive logic
>> of
>> scanning the full table when running a SELECT?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-SELECT-LIMIT-scans-the-entire-Hive-table-tp24938.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark SQL Exception: Conf non-local session path expected to be non-null

2015-10-20 Thread Jon Gregg
1.3 on cdh 5.4.4 ... I'll take the responses to mean that the fix will be
probably a few months away for us.  Not a huge problem but something I've
run into a number of times.

On Tue, Oct 20, 2015 at 3:01 PM, Yin Huai  wrote:

> btw, what version of Spark did you use?
>
> On Mon, Oct 19, 2015 at 1:08 PM, YaoPau  wrote:
>
>> I've connected Spark SQL to the Hive Metastore and currently I'm running
>> SQL
>> code via pyspark.  Typically everything works fine, but sometimes after a
>> long-running Spark SQL job I get the error below, and from then on I can
>> no
>> longer run Spark SQL commands.  I still do have both my sc and my sqlCtx.
>>
>> Any idea what this could mean?
>>
>> An error occurred while calling o36.sql.
>> : org.apache.spark.sql.AnalysisException: Conf non-local session path
>> expected to be non-null;
>> at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:260)
>> at
>>
>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
>> at
>>
>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>>
>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>> at
>>
>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
>> at
>>
>> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
>> at
>>
>> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
>> at
>> org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
>> at
>> org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
>> at
>>
>> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
>> at
>>
>> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>> at
>>
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>>
>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>> at
>>
>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
>> at
>>
>> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
>> at
>>
>> 

Re: collect() works, take() returns ImportError: No module named iter

2015-08-10 Thread Jon Gregg
We did have 2.7 on the driver, 2.6 on the edge nodes and figured that was
the issue, so we've tried many combinations since then with all three of
2.6.6, 2.7.5, and Anaconda's 2.7.10 on each node with different PATHs and
PYTHONPATHs each time.  Every combination has produced the same error.

We came across a comment on the User board saying Since you're using YARN,
you may also need to set SPARK_YARN_USER_ENV to
PYSPARK_PYTHON=/your/desired/python/on/slave/nodes. ... I couldn't find
SPARK_YARN_USER_ENV in the Spark 1.3 docs but we tried that as well and
couldn't get it working.

We're open to trying or re-trying any other ideas.

On Mon, Aug 10, 2015 at 6:25 PM, Ruslan Dautkhanov dautkha...@gmail.com
wrote:

 There is was a similar problem reported before on this list.

 Weird python errors like this generally mean you have different
 versions of python in the nodes of your cluster. Can you check that?

 From error stack you use 2.7.10 |Anaconda 2.3.0
 while OS/CDH version of Python is probably 2.6.



 --
 Ruslan Dautkhanov

 On Mon, Aug 10, 2015 at 3:53 PM, YaoPau jonrgr...@gmail.com wrote:

 I'm running Spark 1.3 on CDH 5.4.4, and trying to set up Spark to run via
 iPython Notebook.  I'm getting collect() to work just fine, but take()
 errors.  (I'm having issues with collect() on other datasets ... but
 take()
 seems to break every time I run it.)

 My code is below.  Any thoughts?

  sc
 pyspark.context.SparkContext at 0x7ffbfa310f10
  sys.version
 '2.7.10 |Anaconda 2.3.0 (64-bit)| (default, May 28 2015, 17:02:03) \n[GCC
 4.4.7 20120313 (Red Hat 4.4.7-1)]'
  hourly = sc.textFile('tester')
  hourly.collect()
 [u'a man',
  u'a plan',
  u'a canal',
  u'panama']
  hourly = sc.textFile('tester')
  hourly.take(2)

 ---
 Py4JJavaError Traceback (most recent call
 last)
 ipython-input-15-1feecba5868b in module()
   1 hourly = sc.textFile('tester')
  2 hourly.take(2)

 /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py in take(self,
 num)
1223
1224 p = range(partsScanned, min(partsScanned +
 numPartsToTry, totalParts))
 - 1225 res = self.context.runJob(self, takeUpToNumLeft, p,
 True)
1226
1227 items += res

 /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py in
 runJob(self, rdd, partitionFunc, partitions, allowLocal)
 841 # SparkContext#runJob.
 842 mappedRDD = rdd.mapPartitions(partitionFunc)
 -- 843 it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
 mappedRDD._jrdd, javaPartitions, allowLocal)
 844 return list(mappedRDD._collect_iterator_through_file(it))
 845


 /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer,
 self.gateway_client,
 -- 538 self.target_id, self.name)
 539
 540 for temp_arg in temp_args:


 /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
 298 raise Py4JJavaError(
 299 'An error occurred while calling
 {0}{1}{2}.\n'.
 -- 300 format(target_id, '.', name), value)
 301 else:
 302 raise Py4JError(

 Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.runJob.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0
 in stage 10.0 failed 4 times, most recent failure: Lost task 0.3 in stage
 10.0 (TID 47, dhd490101.autotrader.com):
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File

 /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py,
 line 101, in main
 process()
   File

 /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/worker.py,
 line 96, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File

 /opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p894.568/jars/spark-assembly-1.3.0-cdh5.4.4-hadoop2.6.0-cdh5.4.4.jar/pyspark/serializers.py,
 line 236, in dump_stream
 vs = list(itertools.islice(iterator, batch))
   File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py, line
 1220, in takeUpToNumLeft
 while taken  left:
 ImportError: No module named iter

 at
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
 at
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176)
 at
 org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-10 Thread Jon Gregg
 :
phd40010024.na.com:8041
15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
phd40010002.na.com:8041
15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered executor:
Actor[akka.tcp://
sparkexecu...@phd40010022.na.com:29369/user/Executor#43651774] with ID 2
15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered executor:
Actor[akka.tcp://
sparkexecu...@phd40010024.na.com:12969/user/Executor#1711844295] with ID 3
15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager
phd40010022.na.com:14119 with 1178.1 MB RAM
15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager
phd40010024.na.com:53284 with 1178.1 MB RAM
15/02/10 12:06:29 INFO CoarseGrainedSchedulerBackend: Registered executor:
Actor[akka.tcp://
sparkexecu...@phd40010002.na.com:35547/user/Executor#-1690254909] with ID 1
15/02/10 12:06:29 INFO BlockManagerInfo: Registering block manager
phd40010002.na.com:62754 with 1178.1 MB RAM
15/02/10 12:06:36 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:06:51 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:07:06 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:07:21 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:07:36 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:07:51 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:08:06 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:08:21 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:08:36 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:08:51 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:09:06 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:09:21 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:09:36 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:09:51 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:10:06 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:10:21 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:10:36 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory
15/02/10 12:10:51 WARN YarnClusterScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient memory

On Fri, Feb 6, 2015 at 3:24 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 You can call collect() to pull in the contents of an RDD into the driver:

   val badIPsLines = badIPs.collect()

 On Fri, Feb 6, 2015 at 12:19 PM, Jon Gregg jonrgr...@gmail.com wrote:

 OK I tried that, but how do I convert an RDD to a Set that I can then
 broadcast and cache?

   val badIPs = sc.textFile(hdfs:///user/jon/+ badfullIPs.csv)
   val badIPsLines = badIPs.getLines
   val badIpSet = badIPsLines.toSet
   val badIPsBC = sc.broadcast(badIpSet)

 produces the error value getLines is not a member of
 org.apache.spark.rdd.RDD[String].

 Leaving it as an RDD and then constantly joining I think will be too slow
 for a streaming job.

 On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Jon,

 You'll need to put the file on HDFS (or whatever

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-10 Thread Jon Gregg
They're separate in my code, how can I combine them?  Here's what I have:

  val sparkConf = new SparkConf()
  val ssc =  new StreamingContext(sparkConf, Seconds(bucketSecs))

  val sc = new SparkContext()

On Tue, Feb 10, 2015 at 1:02 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Is the SparkContext you're using the same one that the StreamingContext
 wraps?  If not, I don't think using two is supported.

 -Sandy

 On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote:

 I'm still getting an error.  Here's my code, which works successfully
 when tested using spark-shell:

   val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect
   val badIpSet = badIPs.toSet
   val badIPsBC = sc.broadcast(badIpSet)


 The job looks OK from my end:

 15/02/07 18:59:58 INFO Client: Application report from ASM:

  application identifier: application_1423081782629_3861

  appId: 3861

 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service:  }*

  appDiagnostics:

  appMasterHost: phd40010008.na.com

  appQueue: root.default

  appMasterRpcPort: 0

  appStartTime: 1423353581140

 * yarnAppState: RUNNING*

  distributedFinalState: UNDEFINED


 But the streaming process never actually begins.  The full log is below,
 scroll to the end for the repeated warning WARN YarnClusterScheduler:
 Initial job has not accepted any resources; check your cluster UI to ensure
 that workers are registered and have sufficient memory.

 I'll note that I have a different Spark Streaming app called dqd
 working successfully for a different job that uses only a StreamingContext
 and not an additional SparkContext.  But this app (called sbStreamingTv)
 uses both a SparkContext and a StreamingContext for grabbing a lookup file
 in HDFS for IP filtering. * The references to line #198 from the log
 below refers to the val badIPs =
 sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it
 looks like Spark doesn't get beyond that point in the code.*

 Also, this job (sbStreamingTv) does work successfully using
 yarn-client, even with both a SparkContext and StreamingContext.  It looks
 to me that in yarn-cluster mode it's grabbing resources for the
 StreamingContext but not for the SparkContext.

 Any ideas?

 Jon


 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity
 1177.8 MB.
 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with
 id = ConnectionManagerId(phd40010008.na.com,30129)
 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager
 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager
 phd40010008.na.com:30129 with 1177.8 MB RAM
 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at
 http://10.229.16.108:35183
 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is
 /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO JettyUtils: Adding filter:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at
 http://phd40010008.na.com:25869
 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to
 /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801
 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler
 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook
 for context org.apache.spark.SparkContext@7f38095d
 15/02/10 12:06:17 INFO ApplicationMaster: Registering the
 ApplicationMaster
 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors.
 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor
 containers, each with 2432 memory
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
 Any, priority: 1, capability: memory:2432, vCores:1
 15/02/10 12:06:20 INFO YarnClusterScheduler:
 YarnClusterScheduler.postStartHook done
 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir
 will be overridden by the value set by the cluster manager (via
 SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg
 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(jg)
 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started
 15/02/10 12:06:20 INFO

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-10 Thread Jon Gregg
(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/10 13:34:54 WARN TaskSetManager: Lost TID 2426 (task 114.0:9)
15/02/10 13:34:54 WARN TaskSetManager: Loss was due to java.lang.Exception
java.lang.Exception: Could not compute split, block input-0-1423593164000
not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

On Tue, Feb 10, 2015 at 1:07 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 You should be able to replace that second line with

 val sc = ssc.sparkContext

 On Tue, Feb 10, 2015 at 10:04 AM, Jon Gregg jonrgr...@gmail.com wrote:

 They're separate in my code, how can I combine them?  Here's what I have:

   val sparkConf = new SparkConf()
   val ssc =  new StreamingContext(sparkConf, Seconds(bucketSecs))

   val sc = new SparkContext()

 On Tue, Feb 10, 2015 at 1:02 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Is the SparkContext you're using the same one that the StreamingContext
 wraps?  If not, I don't think using two is supported.

 -Sandy

 On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg jonrgr...@gmail.com wrote:

 I'm still getting an error.  Here's my code, which works successfully
 when tested using spark-shell:

   val badIPs = sc.textFile(/user/sb/badfullIPs.csv).collect
   val badIpSet = badIPs.toSet
   val badIPsBC = sc.broadcast(badIpSet)


 The job looks OK from my end:

 15/02/07 18:59:58 INFO Client: Application report from ASM:

  application identifier: application_1423081782629_3861

  appId: 3861

 * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service:  }*

  appDiagnostics:

  appMasterHost: phd40010008.na.com

  appQueue: root.default

  appMasterRpcPort: 0

  appStartTime: 1423353581140

 * yarnAppState: RUNNING*

  distributedFinalState: UNDEFINED


 But the streaming process never actually begins.  The full log is
 below, scroll to the end for the repeated warning WARN
 YarnClusterScheduler: Initial job has not accepted any resources; check
 your cluster UI to ensure that workers are registered and have sufficient
 memory.

 I'll note that I have a different Spark Streaming app called dqd
 working successfully for a different job that uses only a StreamingContext
 and not an additional SparkContext.  But this app (called sbStreamingTv)
 uses both a SparkContext and a StreamingContext for grabbing a lookup file
 in HDFS for IP filtering. * The references to line #198 from the log
 below refers to the val badIPs =
 sc.textFile(/user/sb/badfullIPs.csv).collect line shown above, and it
 looks like Spark doesn't get beyond that point in the code.*

 Also, this job (sbStreamingTv) does work successfully using
 yarn-client, even with both a SparkContext and StreamingContext.  It looks
 to me that in yarn-cluster mode it's grabbing resources for the
 StreamingContext but not for the SparkContext.

 Any ideas?

 Jon


 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity
 1177.8 MB.
 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129
 with id = ConnectionManagerId(phd40010008.na.com,30129)
 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register
 BlockManager
 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager
 phd40010008.na.com:30129 with 1177.8 MB RAM
 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at
 http://10.229.16.108:35183
 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is
 /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_01/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec
 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
 15/02/10 12:06:16 INFO JettyUtils: Adding filter:
 org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at
 http://phd40010008.na.com:25869
 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to
 /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801
 15/02/10 12:06:17 INFO YarnClusterScheduler: Created
 YarnClusterScheduler
 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook
 for context org.apache.spark.SparkContext@7f38095d
 15/02/10 12:06:17 INFO ApplicationMaster: Registering the
 ApplicationMaster
 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3

Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-06 Thread Jon Gregg
OK I tried that, but how do I convert an RDD to a Set that I can then
broadcast and cache?

  val badIPs = sc.textFile(hdfs:///user/jon/+ badfullIPs.csv)
  val badIPsLines = badIPs.getLines
  val badIpSet = badIPsLines.toSet
  val badIPsBC = sc.broadcast(badIpSet)

produces the error value getLines is not a member of
org.apache.spark.rdd.RDD[String].

Leaving it as an RDD and then constantly joining I think will be too slow
for a streaming job.

On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Jon,

 You'll need to put the file on HDFS (or whatever distributed filesystem
 you're running on) and load it from there.

 -Sandy

 On Thu, Feb 5, 2015 at 3:18 PM, YaoPau jonrgr...@gmail.com wrote:

 I have a file badFullIPs.csv of bad IP addresses used for filtering.  In
 yarn-client mode, I simply read it off the edge node, transform it, and
 then
 broadcast it:

   val badIPs = fromFile(edgeDir + badfullIPs.csv)
   val badIPsLines = badIPs.getLines
   val badIpSet = badIPsLines.toSet
   val badIPsBC = sc.broadcast(badIpSet)
   badIPs.close

 How can I accomplish this in yarn-cluster mode?

 Jon



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: how to get actual count from as long from JavaDStream ?

2014-09-30 Thread Jon Gregg
Hi Andy

I'm new to Spark and have been working with Scala not Java but I see
there's a dstream() method to convert from JavaDStream to DStream.  Then within
DStream
http://people.apache.org/~pwendell/spark-1.1.0-rc4-docs/api/java/org/apache/spark/streaming/dstream/DStream.html
there is a foreachRDD() method that allows you to do things like:

msgConvertedToDStream.foreachRDD(rdd = println(The count is:  +
rdd.count().toInt))

The syntax for the casting should be changed for Java and probably the
function argument syntax is wrong too, but hopefully there's enough there
to help.

Jon


On Tue, Sep 30, 2014 at 3:42 PM, Andy Davidson 
a...@santacruzintegration.com wrote:

 Hi

 I have a simple streaming app. All I want to do is figure out how many
 lines I have received in the current mini batch. If numLines was a JavaRDD
 I could simply call count(). How do you do something similar in Streaming?


 Here is my psudo code


 JavaDStreamString msg = logs.filter(selectINFO);

 JavaDStreamLong numLines  = msg.count()


 Long totalCount = numLines ???



 Here is what I am really trying to do. I have a python script that
 generated a graph of totalCount vs time. Python does not support streaming.
 As a work around I have a java program that does the steaming. I want to
 pass the data back to the python script. It has been suggested I can use
 rdd.pipe().


 In python I call rdd.pipe(scriptToStartJavaSteam.sh)


 All I need to do is for each mini batch figure out how to get the the
 count of the current mini batch and write it to standard out. Seems like
 this should be simple.


 Maybe Streams do not work the way I think? In a spark core app, I am able
 to get values like count in my driver and do what ever I want with the
 local value. With streams I know I am getting mini patches because print()
 display the first 10 lines of my steam. I assume that some how print is
 executed in my driver so somehow  data was sent from the workers back to
 the driver.


 Any comments or suggestions would be greatly appreciated.


 Andy


 P.s. Should I be asking a different question?