CSV Reader with row numbers

2016-09-14 Thread Akshay Sachdeva
Environment:
Apache Spark 1.6.2
Scala: 2.10

I am currently using the spark-csv package courtesy of databricks and I
would like to have a (pre processing ?) stage when reading the CSV file that
also adds a row number to each row of data being read from the csv file. 
This will allow for better traceability and data lineage in case of
validation or data processing issues downstream.

In doing the research it seems like the zipWithIndex API is the right or
only way to get this pattern implemented.

Would this be the preferred route?  Would this be safe for parallel
operations as far as respect no collisions?  Any body have a similar
requirement and have a better solution you can point me to.

Appreciate any help and responses anyone can offer.

Thanks
-a



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/CSV-Reader-with-row-numbers-tp18946.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Not all KafkaReceivers processing the data Why?

2016-09-14 Thread Jeremy Smith
Take a look at how the messages are actually distributed across the
partitions. If the message keys have a low cardinality, you might get poor
distribution (i.e. all the messages are actually only in two of the five
partitions, leading to what you see in Spark).

If you take a look at the Kafka data directories, you can probably get an
idea of the distribution by just examining the sizes of each partition.

Jeremy

On Wed, Sep 14, 2016 at 12:33 PM, Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> Hello all,
>
>
>
> I have created a Kafka topic with 5 partitions.  And I am using
> createStream receiver API like following.   But somehow only one receiver
> is getting the input data. Rest of receivers are not processign anything.
> Can you please help?
>
>
>
> JavaPairDStream messages = null;
>
>
>
> if(sparkStreamCount > 0){
>
> // We create an input DStream for each partition of the
> topic, unify those streams, and then repartition the unified stream.
>
> List> kafkaStreams = new
> ArrayList>(sparkStreamCount);
>
> for (int i = 0; i < sparkStreamCount; i++) {
>
> kafkaStreams.add(
> KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER),
> contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap));
>
> }
>
> messages = jssc.union(kafkaStreams.get(0),
> kafkaStreams.subList(1, kafkaStreams.size()));
>
> }
>
> else{
>
> messages =  KafkaUtils.createStream(jssc,
> contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID),
> kafkaTopicMap);
>
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Saving less data to improve Pregel performance in GraphX?

2016-09-14 Thread Reynold Xin
This is definitely useful, but in reality it might be very difficult to do.


On Mon, Aug 29, 2016 at 6:46 PM, Fang Zhang  wrote:

> Dear developers,
>
> I am running some tests using Pregel API.
>
> It seems to me that more than 90% of the volume of a graph object is
> composed of index structures that will not change during the execution of
> Pregel. When the size of a graph is too huge to fit in memory, Pregel will
> persist intermediate graphs on disk each time, which seems to involve a lot
> of repeated disk savings.
>
> In my test(Shortest Path), I save only one copy of the initial graph and
> maintain only a var of RDD[(VertexID, VD)]. To create new messages, I
> create
> a new graph using updated RDD[(VertexId, VD)] and the fixed data in initial
> graph during each iteration. Using a slow NTFS hard drive, I did observe
> around 40% overall improvement. Note my updateVertices(corresponding to
> joinVertices) and edges.upgrade are not optimized yet (they can be
> optimized
> following the follow of GraphX) and the improvement should be from I/O.
>
> So my question is: do you think the current flow of Pregel could be
> improved
> by saving a small portion of a large Graph object? If there are other
> concerns, could you explain them?
>
> Best regards,
> Fang
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Saving-less-data-
> to-improve-Pregel-performance-in-GraphX-tp18762.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Not all KafkaReceivers processing the data Why?

2016-09-14 Thread Rachana Srivastava
Hello all,

I have created a Kafka topic with 5 partitions.  And I am using createStream 
receiver API like following.   But somehow only one receiver is getting the 
input data. Rest of receivers are not processign anything.  Can you please help?

JavaPairDStream messages = null;

if(sparkStreamCount > 0){
// We create an input DStream for each partition of the topic, 
unify those streams, and then repartition the unified stream.
List> kafkaStreams = new 
ArrayList>(sparkStreamCount);
for (int i = 0; i < sparkStreamCount; i++) {
kafkaStreams.add( KafkaUtils.createStream(jssc, 
contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), 
kafkaTopicMap));
}
messages = jssc.union(kafkaStreams.get(0), 
kafkaStreams.subList(1, kafkaStreams.size()));
}
else{
messages =  KafkaUtils.createStream(jssc, 
contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), 
kafkaTopicMap);
}



[cid:image001.png@01D20E84.3558F520]






Re: Test fails when compiling spark with tests

2016-09-14 Thread Fred Reiss
Also try doing a fresh clone of the git repository. I've seen some of those
rare failure modes corrupt parts of my local copy in the past.

FWIW the main branch as of yesterday afternoon is building fine in my
environment.

Fred

On Tue, Sep 13, 2016 at 6:29 PM, Jakob Odersky  wrote:

> There are some flaky tests that occasionally fail, my first
> recommendation would be to re-run the test suite. Another thing to
> check is if there are any applications listening to spark's default
> ports.
> Btw, what is your environment like? In case it is windows, I don't
> think tests are regularly run against that platform and therefore
> could very well be broken.
>
> On Sun, Sep 11, 2016 at 10:49 PM, assaf.mendelson
>  wrote:
> > Hi,
> >
> > I am trying to set up a spark development environment. I forked the spark
> > git project and cloned the fork. I then checked out branch-2.0 tag
> (which I
> > assume is the released source code).
> >
> > I then compiled spark twice.
> >
> > The first using:
> >
> > mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package
> >
> > This compiled successfully.
> >
> > The second using mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 clean
> > package
> >
> > This got a failure in Spark Project Core with the following test failing:
> >
> > - caching in memory and disk, replicated
> >
> > - caching in memory and disk, serialized, replicated *** FAILED ***
> >
> >   java.util.concurrent.TimeoutException: Can't find 2 executors before
> 3
> > milliseconds elapsed
> >
> >   at
> > org.apache.spark.ui.jobs.JobProgressListener.waitUntilExecutorsUp(
> JobProgressListener.scala:573)
> >
> >   at
> > org.apache.spark.DistributedSuite.org$apache$spark$DistributedSuite$$
> testCaching(DistributedSuite.scala:154)
> >
> >   at
> > org.apache.spark.DistributedSuite$$anonfun$32$$
> anonfun$apply$1.apply$mcV$sp(DistributedSuite.scala:191)
> >
> >   at
> > org.apache.spark.DistributedSuite$$anonfun$32$$anonfun$apply$1.apply(
> DistributedSuite.scala:191)
> >
> >   at
> > org.apache.spark.DistributedSuite$$anonfun$32$$anonfun$apply$1.apply(
> DistributedSuite.scala:191)
> >
> >   at
> > org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(
> Transformer.scala:22)
> >
> >   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> >
> >   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> >
> >   at org.scalatest.Transformer.apply(Transformer.scala:22)
> >
> >   at org.scalatest.Transformer.apply(Transformer.scala:20)
> >
> >   ...
> >
> > - compute without caching when no partitions fit in memory
> >
> >
> >
> > I made no changes to the code whatsoever. Can anyone help me figure out
> what
> > is wrong with my environment?
> >
> > BTW I am using maven 3.3.9 and java 1.8.0_101-b13
> >
> >
> >
> > Thanks,
> >
> > Assaf
> >
> >
> > 
> > View this message in context: Test fails when compiling spark with tests
> > Sent from the Apache Spark Developers List mailing list archive at
> > Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Spark SQL - Applying transformation on a struct inside an array

2016-09-14 Thread Fred Reiss
+1 to this request. I talked last week with a product group within IBM that
is struggling with the same issue. It's pretty common in data cleaning
applications for data in the early stages to have nested lists or sets
inconsistent or incomplete schema information.

Fred

On Tue, Sep 13, 2016 at 8:08 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> Hi everyone,
> I'm currently trying to create a generic transformation mecanism on a
> Dataframe to modify an arbitrary column regardless of the underlying the
> schema.
>
> It's "relatively" straightforward for complex types like struct>
> to apply an arbitrary UDF on the column and replace the data "inside" the
> struct, however I'm struggling to make it work for complex types containing
> arrays along the way like struct>>.
>
> Michael Armbrust seemed to allude on the mailing list/forum to a way of
> using Encoders to do that, I'd be interested in any pointers, especially
> considering that it's not possible to output any Row or
> GenericRowWithSchema from a UDF (thanks to https://github.com/apache/
> spark/blob/v2.0.0/sql/catalyst/src/main/scala/org/
> apache/spark/sql/catalyst/ScalaReflection.scala#L657 it seems).
>
> To sum up, I'd like to find a way to apply a transformation on complex
> nested datatypes (arrays and struct) on a Dataframe updating the value
> itself.
>
> Regards,
>
> *Olivier Girardot*
>


Question about impersonation on Spark executor

2016-09-14 Thread Tao Li
Hi,

I am new to Spark and would like to have a quick question about the end user 
impersonation on Spark executor process.

Basically I am running SQL queries through Spark thrift server with doAs set to 
true to enable end user impersonation. In my experiment, I was able to start 
session for multiple end users at the same time and all queries look fine. For 
example, user A can query table 1, which is accessible to A exclusively 
(according to HDFS permission). At the same time, user B can query table 2, 
which is accessible to B exclusively. Looks like the end user UGI has been 
flowed to the executor process successfully. I checked SparkContext code and 
looks like the end user info is flowed to executor by specifying “SPARK_USER” 
env variable. Correct me if I am wrong.

I only see 1 executor process running for all the queries from multiple users 
in my experiment. The question is why the single process can impersonate 
multiple end users at the same time. I assume the value of “SPARK_USER” env 
variable should be either user A or B in the executor. Then there has to be 
HDFS permission errors for the other user. But I did not see any error for any 
user.

Can someone give some insights on that question? Thanks so much.


Re: Subscription

2016-09-14 Thread Daniel Lopes
Hi Omkar,

Look at this link http://spark.apache.org/community.html to subscribe to
the right list.

Best,

*Daniel Lopes*
Chief Data and Analytics Officer | OneMatch
c: +55 (18) 99764-2733 | https://www.linkedin.com/in/dslopes

www.onematch.com.br


On Sat, Sep 3, 2016 at 11:18 AM, Omkar Reddy 
wrote:

> Subscribe me!
>


sqlContext.registerDataFrameAsTable is not working properly in pyspark 2.0

2016-09-14 Thread sririshindra
Hi,

I have a production job that is registering four different dataframes as
tables in pyspark 1.6.2 . when we upgraded to spark 2.0 only three of the
four dataframes are getting registered. the fourth dataframe is not getting
registered. There are no code changes whatsoever. The only change is the
spark verion. When I revert the spark version to 1.6.2 the dataframe is
getting registered properly.  Did anyone face a similar issue? Is this a bug
in spark 2.0 or is it just a compatibility issue?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/sqlContext-registerDataFrameAsTable-is-not-working-properly-in-pyspark-2-0-tp18938.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org