Re: HBaseContext with Spark

2017-01-27 Thread Chetan Khatri
storage handler bulk load:

SET hive.hbase.bulk=true;
INSERT OVERWRITE TABLE users SELECT … ;
But for now, you have to do some work and issue multiple Hive commands
Sample source data for range partitioning
Save sampling results to a file
Run CLUSTER BY query using HiveHFileOutputFormat and TotalOrderPartitioner
(sorts data, producing a large number of region files)
Import HFiles into HBase
HBase can merge files if necessary

On Sat, Jan 28, 2017 at 11:32 AM, Chetan Khatri  wrote:

> @Ted, I dont think so.
>
> On Thu, Jan 26, 2017 at 6:35 AM, Ted Yu  wrote:
>
>> Does the storage handler provide bulk load capability ?
>>
>> Cheers
>>
>> On Jan 25, 2017, at 3:39 AM, Amrit Jangid 
>> wrote:
>>
>> Hi chetan,
>>
>> If you just need HBase Data into Hive, You can use Hive EXTERNAL TABLE
>> with
>>
>> STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'.
>>
>>
>> Try this if you problem can be solved
>>
>>
>> https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration
>>
>>
>> Regards
>>
>> Amrit
>>
>>
>> .
>>
>> On Wed, Jan 25, 2017 at 5:02 PM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Spark Community Folks,
>>>
>>> Currently I am using HBase 1.2.4 and Hive 1.2.1, I am looking for Bulk
>>> Load from Hbase to Hive.
>>>
>>> I have seen couple of good example at HBase Github Repo:
>>> https://github.com/apache/hbase/tree/master/hbase-spark
>>>
>>> If I would like to use HBaseContext with HBase 1.2.4, how it can be done
>>> ? Or which version of HBase has more stability with HBaseContext ?
>>>
>>> Thanks.
>>>
>>
>>
>>
>>
>>
>


Re: spark architecture question -- Pleas Read

2017-01-27 Thread Russell Spitzer
You can treat Oracle as a JDBC source (
http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases)
and skip Sqoop, HiveTables and go straight to Queries. Then you can skip
hive on the way back out (see the same link) and write directly to Oracle.
I'll leave the performance questions for someone else.

On Fri, Jan 27, 2017 at 11:06 PM Sirisha Cheruvu  wrote:

>
> On Sat, Jan 28, 2017 at 6:44 AM, Sirisha Cheruvu 
> wrote:
>
> Hi Team,
>
> RIght now our existing flow is
>
> Oracle-->Sqoop --> Hive--> Hive Queries on Spark-sql (Hive
> Context)-->Destination Hive table -->sqoop export to Oracle
>
> Half of the Hive UDFS required is developed in Java UDF..
>
> SO Now I want to know if I run the native scala UDF's than runninng hive
> java udfs in spark-sql will there be any performance difference
>
>
> Can we skip the Sqoop Import and export part and
>
> Instead directly load data from oracle to spark and code Scala UDF's for
> transformations and export output data back to oracle?
>
> RIght now the architecture we are using is
>
> oracle-->Sqoop (Import)-->Hive Tables--> Hive Queries --> Spark-SQL-->
> Hive --> Oracle
> what would be optimal architecture to process data from oracle using spark
> ?? can i anyway better this process ?
>
>
>
>
> Regards,
> Sirisha
>
>
>


Re: spark architecture question -- Pleas Read

2017-01-27 Thread Sirisha Cheruvu
On Sat, Jan 28, 2017 at 6:44 AM, Sirisha Cheruvu  wrote:

> Hi Team,
>
> RIght now our existing flow is
>
> Oracle-->Sqoop --> Hive--> Hive Queries on Spark-sql (Hive
> Context)-->Destination Hive table -->sqoop export to Oracle
>
> Half of the Hive UDFS required is developed in Java UDF..
>
> SO Now I want to know if I run the native scala UDF's than runninng hive
> java udfs in spark-sql will there be any performance difference
>
>
> Can we skip the Sqoop Import and export part and
>
> Instead directly load data from oracle to spark and code Scala UDF's for
> transformations and export output data back to oracle?
>
> RIght now the architecture we are using is
>
> oracle-->Sqoop (Import)-->Hive Tables--> Hive Queries --> Spark-SQL-->
> Hive --> Oracle
> what would be optimal architecture to process data from oracle using spark
> ?? can i anyway better this process ?
>
>
>
>
> Regards,
> Sirisha
>


spark architecture question -- Pleas Read

2017-01-27 Thread Sirisha Cheruvu
Hi Team,

RIght now our existing flow is

Oracle-->Sqoop --> Hive--> Hive Queries on Spark-sql (Hive
Context)-->Destination Hive table -->sqoop export to Oracle

Half of the Hive UDFS required is developed in Java UDF..

SO Now I want to know if I run the native scala UDF's than runninng hive
java udfs in spark-sql will there be any performance difference


Can we skip the Sqoop Import and export part and

Instead directly load data from oracle to spark and code Scala UDF's for
transformations and export output data back to oracle?

RIght now the architecture we are using is

oracle-->Sqoop (Import)-->Hive Tables--> Hive Queries --> Spark-SQL--> Hive
--> Oracle
what would be optimal architecture to process data from oracle using spark
?? can i anyway better this process ?




Regards,
Sirisha


CFP for Spark Summit San Francisco closes on Feb. 6

2017-01-27 Thread Scott walent
In June, the 10th Spark Summit will take place in San Francisco at Moscone
West. We have expanded our CFP to include more topics and deep-dive
technical sessions.

Take center stage in front of your fellow Spark enthusiasts. Submit your
presentation and join us for the big ten. The CFP closes on February 6th!

Submit your abstracts at https://spark-summit.org/2017


RE: spark 2.02 error when writing to s3

2017-01-27 Thread VND Tremblay, Paul
Not sure what you mean by "a consistency layer on top." Any explanation would 
be greatly appreciated!

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Friday, January 27, 2017 3:20 AM
To: VND Tremblay, Paul
Cc: Neil Jonkers; Takeshi Yamamuro; user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

OK

Nobody should be committing output directly to S3 without having something add 
a consistency layer on top, not if you want reliabie (as in "doesn't 
lose/corrupt data" reliable) work

On 26 Jan 2017, at 19:09, VND Tremblay, Paul 
> wrote:

This seems to have done the trick, although I am not positive. If I have time, 
I'll test spinning up a cluster with and without consistent view to pin point 
the error.

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_


From: Neil Jonkers [mailto:neilod...@gmail.com]
Sent: Friday, January 20, 2017 11:39 AM
To: Steve Loughran; VND Tremblay, Paul
Cc: Takeshi Yamamuro; user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Can you test by enabling emrfs consistent view and use s3:// uri.

http://docs.aws.amazon.com/emr/latest/ManagementGuide/enable-consistent-view.html

 Original message 
From: Steve Loughran
Date:20/01/2017 21:17 (GMT+02:00)
To: "VND Tremblay, Paul"
Cc: Takeshi Yamamuro ,user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

AWS S3 is eventually consistent: even after something is deleted, a LIST/GET 
call may show it. You may be seeing that effect; even after the DELETE has got 
rid of the files, a listing sees something there, And I suspect the time it 
takes for the listing to "go away" will depend on the total number of entries 
underneath, as there are more deletion markers "tombstones" to propagate around 
s3

Try deleting the path and then waiting a short period


On 20 Jan 2017, at 18:54, VND Tremblay, Paul 
> wrote:

I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.

Thanks

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_



From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul 
> wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv


My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don't get the error

* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any 

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Michael Armbrust
Yeah, kafka server client compatibility can be pretty confusing and does
not give good errors in the case of mismatches.  This should be addressed
in the next release of kafka (they are adding an API to query the servers
capabilities).

On Fri, Jan 27, 2017 at 12:56 PM, Koert Kuipers  wrote:

> in case anyone else runs into this:
>
> the issue is that i was using kafka-clients 0.10.1.1
>
> it works when i use kafka-clients 0.10.0.1 with spark structured streaming
>
> my kafka server is 0.10.1.1
>
> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers  wrote:
>
>> i checked my topic. it has 5 partitions but all the data is written to a
>> single partition: wikipedia-2
>> i turned on debug logging and i see this:
>>
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
>> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
>> wikipedia-1]. Seeking to the end.
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
>> successful heartbeat response for group spark-kafka-source-fac4f749-fd
>> 56-4a32-82c7-e687aadf520b-1923704552-driver-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=152908} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-1 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
>> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
>> wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> what is confusing to me is this:
>> Resetting offset for partition wikipedia-2 to latest offset.
>> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
>> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 ->
>> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> why does it find latest offset 152908 for wikipedia-2 but then sets
>> latest offset to 0 for that partition? or am i misunderstanding?
>>
>> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers  wrote:
>>
>>> code:
>>>   val query = spark.readStream
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", "somenode:9092")
>>> .option("subscribe", "wikipedia")
>>> .load
>>> .select(col("value") cast StringType)
>>> .writeStream
>>> .format("console")
>>> .outputMode(OutputMode.Append)
>>> .start()
>>>
>>>   while (true) {
>>> Thread.sleep(1)
>>> println(query.lastProgress)
>>>   }
>>> }
>>>
>>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <
>>> alons...@gmail.com> wrote:
>>>
 lets see the code...

 Alonso Isidoro Roman
 

Re: Dynamic resource allocation to Spark on Mesos

2017-01-27 Thread Mihai Iacob
What about Spark on Kubernetes, is there a way to manage dynamic resource allocation?
 
Regards, 

Mihai Iacob 
 
- Original message -From: Michael Gummelt To: Ji Yan Cc: user Subject: Re: Dynamic resource allocation to Spark on MesosDate: Fri, Jan 27, 2017 2:14 PM 
> The way I understand is that the Spark job will not run if the CPU/Mem requirement is not met. Spark jobs will still run if they only have a subset of the requested resources.  Tasks begin scheduling as soon as the first executor comes up.  Dynamic allocation yields increased utilization by only allocating as many executors as a job needs, rather than a single static amount set up front. Dynamic Allocation is supported in Spark on Mesos, but we here at Mesosphere haven't been testing it much, and I'm not sure what the community adoption is.  So I can't yet speak to its robustness, but we will be investing in it soon.  Many users want it.
 
On Fri, Jan 27, 2017 at 9:35 AM, Ji Yan  wrote:

Dear Spark Users,
 
Currently is there a way to dynamically allocate resources to Spark on Mesos? Within Spark we can specify the CPU cores, memory before running job. The way I understand is that the Spark job will not run if the CPU/Mem requirement is not met. This may lead to decrease in overall utilization of the cluster. An alternative behavior is to launch the job with the best resource offer Mesos is able to give. Is this possible with the current implementation?
 
Thanks
Ji 

The information in this email is confidential and may be legally privileged. It is intended solely for the addressee. Access to this email by anyone else is unauthorized. If you are not the intended recipient, any disclosure, copying, distribution or any action taken or omitted to be taken in reliance on it, is prohibited and may be unlawful.--
 Michael GummeltSoftware EngineerMesosphere
 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: kafka structured streaming source refuses to read

2017-01-27 Thread Koert Kuipers
in case anyone else runs into this:

the issue is that i was using kafka-clients 0.10.1.1

it works when i use kafka-clients 0.10.0.1 with spark structured streaming

my kafka server is 0.10.1.1

On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers  wrote:

> i checked my topic. it has 5 partitions but all the data is written to a
> single partition: wikipedia-2
> i turned on debug logging and i see this:
>
> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
> wikipedia-1]. Seeking to the end.
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-1
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-0 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-0 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-4 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-4 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-3 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
> successful heartbeat response for group spark-kafka-source-fac4f749-
> fd56-4a32-82c7-e687aadf520b-1923704552-driver-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-3 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-2 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=152908} for partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-2 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-1 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-1
> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
> wikipedia-3 -> 0, wikipedia-0 -> 0)
>
> what is confusing to me is this:
> Resetting offset for partition wikipedia-2 to latest offset.
> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0,
> wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>
> why does it find latest offset 152908 for wikipedia-2 but then sets latest
> offset to 0 for that partition? or am i misunderstanding?
>
> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers  wrote:
>
>> code:
>>   val query = spark.readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", "somenode:9092")
>> .option("subscribe", "wikipedia")
>> .load
>> .select(col("value") cast StringType)
>> .writeStream
>> .format("console")
>> .outputMode(OutputMode.Append)
>> .start()
>>
>>   while (true) {
>> Thread.sleep(1)
>> println(query.lastProgress)
>>   }
>> }
>>
>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman > > wrote:
>>
>>> lets see the code...
>>>
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> 
>>>
>>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers :
>>>
 my little program prints out query.lastProgress every 10 seconds, and
 this is what it shows:

 {
   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
   "runId" : 

Re: Issue creating row with java.util.Map type

2017-01-27 Thread Ankur Srivastava
Thank you Richard for responding.

I am able to run it successfully by using row.getMap but since I have to
update the map I wanted to use the HashMap api. Is there a way I can use
that? And I am surprised it worked in first case where I am creating
Dataset from list of rows but fails in the Map function.

Thanks
Ankur

On Fri, Jan 27, 2017 at 12:15 PM, Richard Xin 
wrote:

> try
> Row newRow = RowFactory.create(row.getString(0), row.getString(1), 
> row.getMap(2));
>
>
>
> On Friday, January 27, 2017 10:52 AM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>
> + DEV Mailing List
>
> On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
> Hi,
>
> I am trying to map a Dataset with rows which have a map attribute. When I
> try to create a Row with the map attribute I get cast errors. I am able to
> reproduce the issue with the below sample code. The surprising thing is
> with same schema I am able to create a dataset from the List of rows.
>
> I am on Spark 2.0 and scala 2.11
>
> public static void main(String[] args) {
> StructType schema = new StructType().add("src", DataTypes.StringType)
> .add("dst", DataTypes.StringType)
> .add("freq", DataTypes.createMapType( DataTypes.StringType, 
> DataTypes.IntegerType));
> List inputData = new ArrayList<>();
> inputData.add(RowFactory.creat e("1", "2", new HashMap<>()));
> SparkSession sparkSession = SparkSession
> .builder()
> .appName("IPCountFilterTest")
> .master("local")
> .getOrCreate();
>
> Dataset out = sparkSession.createDataFrame( inputData, schema);
> out.show();
>
> Encoder rowEncoder = RowEncoder.apply(schema);
> out.map((MapFunction) row -> {
> Row newRow = RowFactory.create(row. getString(0), row.getString(1), 
> new HashMap());
>
>//Row newRow = RowFactory.create(row. getString(0), row.getString(1), 
> row.getJavaMap(2));
>
> return newRow;
> }, rowEncoder).show();
> }
>
> Below is the error:
>
> 17/01/26 17:05:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
> java.lang.RuntimeException: java.util.HashMap is not a valid external type
> for schema of map
> at org.apache.spark.sql.catalyst. expressions.GeneratedClass$
> GeneratedIterator.processNext( Unknown Source)
> at org.apache.spark.sql. execution.BufferedRowIterator.
> hasNext(BufferedRowIterator. java:43)
> at org.apache.spark.sql. execution. WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370)
> at org.apache.spark.sql. execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:246)
> at org.apache.spark.sql. execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:240)
> at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD. scala:784)
> at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD. scala:784)
> at org.apache.spark.rdd. MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319)
> at org.apache.spark.rdd.RDD. iterator(RDD.scala:283)
> at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70)
> at org.apache.spark.scheduler. Task.run(Task.scala:85)
> at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274)
> at java.util.concurrent. ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent. ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread. java:745)
> 17/01/26 17:05:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> localhost): java.lang.RuntimeException: java.util.HashMap is not a valid
> external type for schema of map
> at org.apache.spark.sql.catalyst. expressions.GeneratedClass$
> GeneratedIterator.processNext( Unknown Source)
> at org.apache.spark.sql. execution.BufferedRowIterator.
> hasNext(BufferedRowIterator. java:43)
> at org.apache.spark.sql. execution. WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370)
> at org.apache.spark.sql. execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:246)
> at org.apache.spark.sql. execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:240)
> at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD. scala:784)
> at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD. scala:784)
> at org.apache.spark.rdd. MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319)
> at org.apache.spark.rdd.RDD. iterator(RDD.scala:283)
> at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70)
> at org.apache.spark.scheduler. Task.run(Task.scala:85)
> at org.apache.spark.executor. Executor$TaskRunner.run( 

Re: Issue creating row with java.util.Map type

2017-01-27 Thread Richard Xin
try
Row newRow = RowFactory.create(row.getString(0), row.getString(1), 
row.getMap(2)); 

On Friday, January 27, 2017 10:52 AM, Ankur Srivastava 
 wrote:
 

 + DEV Mailing List
On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava  
wrote:

Hi,
I am trying to map a Dataset with rows which have a map attribute. When I try 
to create a Row with the map attribute I get cast errors. I am able to 
reproduce the issue with the below sample code. The surprising thing is with 
same schema I am able to create a dataset from the List of rows.
I am on Spark 2.0 and scala 2.11public static void main(String[] args) {
StructType schema = new StructType().add("src", DataTypes.StringType)
.add("dst", DataTypes.StringType)
.add("freq", DataTypes.createMapType( DataTypes.StringType, 
DataTypes.IntegerType));
List inputData = new ArrayList<>();
inputData.add(RowFactory.creat e("1", "2", new HashMap<>()));
SparkSession sparkSession = SparkSession
.builder()
.appName("IPCountFilterTest")
.master("local")
.getOrCreate();

Dataset out = sparkSession.createDataFrame( inputData, schema);
out.show();

Encoder rowEncoder = RowEncoder.apply(schema);
out.map((MapFunction) row -> {
Row newRow = RowFactory.create(row. getString(0), row.getString(1), new 
HashMap());   //Row newRow = RowFactory.create(row. 
getString(0), row.getString(1), row.getJavaMap(2));return newRow;
}, rowEncoder).show();
}
Below is the error:
17/01/26 17:05:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 
0)java.lang.RuntimeException: java.util.HashMap is not a valid external type 
for schema of map at org.apache.spark.sql.catalyst. 
expressions.GeneratedClass$ GeneratedIterator.processNext( Unknown Source) at 
org.apache.spark.sql. execution.BufferedRowIterator. 
hasNext(BufferedRowIterator. java:43) at org.apache.spark.sql. execution. 
WholeStageCodegenExec$$ anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 
370) at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 
4.apply(SparkPlan.scala:246) at org.apache.spark.sql. 
execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:240) at 
org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 
1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd.RDD$$ 
anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at 
org.apache.spark.rdd. MapPartitionsRDD.compute( MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319) at 
org.apache.spark.rdd.RDD. iterator(RDD.scala:283) at 
org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70) at 
org.apache.spark.scheduler. Task.run(Task.scala:85) at 
org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) at 
java.util.concurrent. ThreadPoolExecutor.runWorker( 
ThreadPoolExecutor.java:1142) at java.util.concurrent. 
ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at 
java.lang.Thread.run(Thread. java:745)17/01/26 17:05:30 WARN TaskSetManager: 
Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.RuntimeException: 
java.util.HashMap is not a valid external type for schema of map at 
org.apache.spark.sql.catalyst. expressions.GeneratedClass$ 
GeneratedIterator.processNext( Unknown Source) at org.apache.spark.sql. 
execution.BufferedRowIterator. hasNext(BufferedRowIterator. java:43) at 
org.apache.spark.sql. execution. WholeStageCodegenExec$$ 
anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370) at 
org.apache.spark.sql. execution.SparkPlan$$anonfun$ 
4.apply(SparkPlan.scala:246) at org.apache.spark.sql. 
execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:240) at 
org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 
1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd.RDD$$ 
anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at 
org.apache.spark.rdd. MapPartitionsRDD.compute( MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319) at 
org.apache.spark.rdd.RDD. iterator(RDD.scala:283) at 
org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70) at 
org.apache.spark.scheduler. Task.run(Task.scala:85) at 
org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) at 
java.util.concurrent. ThreadPoolExecutor.runWorker( 
ThreadPoolExecutor.java:1142) at java.util.concurrent. 
ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at 
java.lang.Thread.run(Thread. java:745)

ThanksAnkur



   

Re: Dynamic resource allocation to Spark on Mesos

2017-01-27 Thread Michael Gummelt
> The way I understand is that the Spark job will not run if the CPU/Mem
requirement is not met.

Spark jobs will still run if they only have a subset of the requested
resources.  Tasks begin scheduling as soon as the first executor comes up.
Dynamic allocation yields increased utilization by only allocating as many
executors as a job needs, rather than a single static amount set up front.

Dynamic Allocation is supported in Spark on Mesos, but we here at
Mesosphere haven't been testing it much, and I'm not sure what the
community adoption is.  So I can't yet speak to its robustness, but we will
be investing in it soon.  Many users want it.

On Fri, Jan 27, 2017 at 9:35 AM, Ji Yan  wrote:

> Dear Spark Users,
>
> Currently is there a way to dynamically allocate resources to Spark on
> Mesos? Within Spark we can specify the CPU cores, memory before running
> job. The way I understand is that the Spark job will not run if the CPU/Mem
> requirement is not met. This may lead to decrease in overall utilization of
> the cluster. An alternative behavior is to launch the job with the best
> resource offer Mesos is able to give. Is this possible with the current
> implementation?
>
> Thanks
> Ji
>
> The information in this email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful.
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: Issue creating row with java.util.Map type

2017-01-27 Thread Ankur Srivastava
+ DEV Mailing List

On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Hi,
>
> I am trying to map a Dataset with rows which have a map attribute. When I
> try to create a Row with the map attribute I get cast errors. I am able to
> reproduce the issue with the below sample code. The surprising thing is
> with same schema I am able to create a dataset from the List of rows.
>
> I am on Spark 2.0 and scala 2.11
>
> public static void main(String[] args) {
> StructType schema = new StructType().add("src", DataTypes.StringType)
> .add("dst", DataTypes.StringType)
> .add("freq", DataTypes.createMapType(DataTypes.StringType, 
> DataTypes.IntegerType));
> List inputData = new ArrayList<>();
> inputData.add(RowFactory.create("1", "2", new HashMap<>()));
> SparkSession sparkSession = SparkSession
> .builder()
> .appName("IPCountFilterTest")
> .master("local")
> .getOrCreate();
>
> Dataset out = sparkSession.createDataFrame(inputData, schema);
> out.show();
>
> Encoder rowEncoder = RowEncoder.apply(schema);
> out.map((MapFunction) row -> {
> Row newRow = RowFactory.create(row.getString(0), row.getString(1), 
> new HashMap());
>
>//Row newRow = RowFactory.create(row.getString(0), row.getString(1), 
> row.getJavaMap(2));
>
> return newRow;
> }, rowEncoder).show();
> }
>
> Below is the error:
>
> 17/01/26 17:05:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
> java.lang.RuntimeException: java.util.HashMap is not a valid external type
> for schema of map
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:246)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:240)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:784)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:784)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 17/01/26 17:05:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> localhost): java.lang.RuntimeException: java.util.HashMap is not a valid
> external type for schema of map
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:246)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:240)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:784)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:784)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Thanks
> Ankur
>


Re: kafka structured streaming source refuses to read

2017-01-27 Thread Shixiong(Ryan) Zhu
Thanks for reporting this. Which Spark version are you using? Could you
provide the full log, please?

On Fri, Jan 27, 2017 at 10:24 AM, Koert Kuipers  wrote:

> i checked my topic. it has 5 partitions but all the data is written to a
> single partition: wikipedia-2
> i turned on debug logging and i see this:
>
> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
> wikipedia-1]. Seeking to the end.
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-1
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-0 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-0 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-4 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-4 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-3 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
> successful heartbeat response for group spark-kafka-source-fac4f749-
> fd56-4a32-82c7-e687aadf520b-1923704552-driver-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-3 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-2 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=152908} for partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-2 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-1 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-1
> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
> wikipedia-3 -> 0, wikipedia-0 -> 0)
>
> what is confusing to me is this:
> Resetting offset for partition wikipedia-2 to latest offset.
> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0,
> wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>
> why does it find latest offset 152908 for wikipedia-2 but then sets latest
> offset to 0 for that partition? or am i misunderstanding?
>
> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers  wrote:
>
>> code:
>>   val query = spark.readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", "somenode:9092")
>> .option("subscribe", "wikipedia")
>> .load
>> .select(col("value") cast StringType)
>> .writeStream
>> .format("console")
>> .outputMode(OutputMode.Append)
>> .start()
>>
>>   while (true) {
>> Thread.sleep(1)
>> println(query.lastProgress)
>>   }
>> }
>>
>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman > > wrote:
>>
>>> lets see the code...
>>>
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> 
>>>
>>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers :
>>>
 my little program prints out query.lastProgress every 10 seconds, and
 this is what it shows:

 {
   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
   "name" : "wiki",
   "timestamp" : 

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Koert Kuipers
i checked my topic. it has 5 partitions but all the data is written to a
single partition: wikipedia-2
i turned on debug logging and i see this:

2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
wikipedia-1]. Seeking to the end.
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-0
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-4
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-3
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-2
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-1
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-0 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-0
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-0 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-0
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-4 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-4
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-4 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-4
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-3 to latest offset.
2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
successful heartbeat response for group
spark-kafka-source-fac4f749-fd56-4a32-82c7-e687aadf520b-1923704552-driver-0
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-3
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-3 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-3
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-2 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=152908} for partition wikipedia-2
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-2 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-2
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-1 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-1
2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
wikipedia-3 -> 0, wikipedia-0 -> 0)

what is confusing to me is this:
Resetting offset for partition wikipedia-2 to latest offset.
Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0,
wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)

why does it find latest offset 152908 for wikipedia-2 but then sets latest
offset to 0 for that partition? or am i misunderstanding?

On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers  wrote:

> code:
>   val query = spark.readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "somenode:9092")
> .option("subscribe", "wikipedia")
> .load
> .select(col("value") cast StringType)
> .writeStream
> .format("console")
> .outputMode(OutputMode.Append)
> .start()
>
>   while (true) {
> Thread.sleep(1)
> println(query.lastProgress)
>   }
> }
>
> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman 
> wrote:
>
>> lets see the code...
>>
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> 
>>
>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers :
>>
>>> my little program prints out query.lastProgress every 10 seconds, and
>>> this is what it shows:
>>>
>>> {
>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>   "name" : "wiki",
>>>   "timestamp" : "2017-01-26T22:54:45.732Z",
>>>   "numInputRows" : 0,
>>>   "inputRowsPerSecond" : 0.0,
>>>   "processedRowsPerSecond" : 0.0,
>>>   "durationMs" : {
>>> "getOffset" : 9,
>>> "triggerExecution" : 10
>>>   },
>>>   "stateOperators" : [ ],
>>>   "sources" : [ {
>>> "description" : "KafkaSource[Subscribe[wikipedia]]",
>>> "startOffset" : {
>>>   

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Koert Kuipers
code:
  val query = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "somenode:9092")
.option("subscribe", "wikipedia")
.load
.select(col("value") cast StringType)
.writeStream
.format("console")
.outputMode(OutputMode.Append)
.start()

  while (true) {
Thread.sleep(1)
println(query.lastProgress)
  }
}

On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman 
wrote:

> lets see the code...
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> 
>
> 2017-01-27 5:56 GMT+01:00 Koert Kuipers :
>
>> my little program prints out query.lastProgress every 10 seconds, and
>> this is what it shows:
>>
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:54:45.732Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "getOffset" : 9,
>> "triggerExecution" : 10
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>> "description" : "KafkaSource[Subscribe[wikipedia]]",
>> "startOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "endOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "numInputRows" : 0,
>> "inputRowsPerSecond" : 0.0,
>> "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>> "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:54:55.745Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "getOffset" : 5,
>> "triggerExecution" : 5
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>> "description" : "KafkaSource[Subscribe[wikipedia]]",
>> "startOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "endOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "numInputRows" : 0,
>> "inputRowsPerSecond" : 0.0,
>> "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>> "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:55:05.748Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "getOffset" : 5,
>> "triggerExecution" : 5
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>> "description" : "KafkaSource[Subscribe[wikipedia]]",
>> "startOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "endOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "numInputRows" : 0,
>> "inputRowsPerSecond" : 0.0,
>> "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>> "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:55:15.758Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "getOffset" : 4,
>> "triggerExecution" : 4
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>> "description" : "KafkaSource[Subscribe[wikipedia]]",
>> "startOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "endOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "numInputRows" : 0,
>> "inputRowsPerSecond" : 0.0,
>> "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>> "description" : 

Converting timezones in Spark

2017-01-27 Thread Don Drake
I'm reading CSV with a timestamp clearly identified in the UTC timezone,
and I need to store this in a parquet format and eventually read it back
and convert to different timezones as needed.

Sounds straightforward, but this involves some crazy function calls and I'm
seeing strange results as I build a test case.

See my example below.  Why are the values for est_ts and cst_ts the same in
rows 1 and 3 (wrong), but different and correct in row 4?  I have a feeling
it has to do with daylight savings time, but I'm not sure where to resolve
it.

Please note that I'm in the Central timezone.

Is there a better method to do this?


Spark context available as 'sc' (master = local[*], app id =
local-1485539128193).

Spark session available as 'spark'.

Welcome to

    __

 / __/__  ___ _/ /__

_\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0

  /_/


Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_60)

Type in expressions to have them evaluated.

Type :help for more information.


scala> :paste

// Entering paste mode (ctrl-D to finish)


import org.apache.spark.sql.Column

def stringts_to_tz(col:Column, tz:String) = {

from_utc_timestamp(to_utc_timestamp(from_unixtime(unix_timestamp(col,
"-MM-dd HH:mm:ss Z")), "CST"), tz)

}


val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"), (3L,
"2016-09-14 16:59:57 UTC"), (4L, "2016-11-31 12:00:01 UTC")).toDF("id",
"dts")

val df2 = df.withColumn("created_at", unix_timestamp($"dts", "-MM-dd
HH:mm:ss Z").cast("timestamp"))

.withColumn("unix_ts", unix_timestamp($"dts", "-MM-dd HH:mm:ss Z"))

.withColumn("local_hour", hour($"created_at"))

.withColumn("s2", from_unixtime($"unix_ts"))

.withColumn("s3", to_utc_timestamp($"s2", "CST"))

.withColumn("s4", from_utc_timestamp($"s3", "EST"))

.withColumn("utc_ts", stringts_to_tz($"dts", "UTC"))

.withColumn("est_ts", stringts_to_tz($"dts", "CST"))

.withColumn("cst_ts", stringts_to_tz($"dts", "EST"))

df2.show(4,false)

df2.printSchema



// Exiting paste mode, now interpreting.


+---+---+-+--+--+---+-+-+-+-+-+

|id |dts|created_at   |unix_ts   |local_hour|s2
|s3   |s4   |utc_ts
  |est_ts   |cst_ts   |

+---+---+-+--+--+---+-+-+-+-+-+

|1  |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|1473871592|11
 |2016-09-14 11:46:32|2016-09-14 16:46:32.0|2016-09-14
11:46:32.0|2016-09-14 16:46:32.0|2016-09-14 11:46:32.0|2016-09-14
11:46:32.0|

|2  |not a timestamp|null |null  |null
 |null   |null |null |null
|null |null |

|3  |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|1473872397|11
 |2016-09-14 11:59:57|2016-09-14 16:59:57.0|2016-09-14
11:59:57.0|2016-09-14 16:59:57.0|2016-09-14 11:59:57.0|2016-09-14
11:59:57.0|

|4  |2016-11-31 12:00:01 UTC|2016-12-01 06:00:01.0|1480593601|6
|2016-12-01 06:00:01|2016-12-01 12:00:01.0|2016-12-01 07:00:01.0|2016-12-01
12:00:01.0|2016-12-01 06:00:01.0|2016-12-01 07:00:01.0|

+---+---+-+--+--+---+-+-+-+-+-+


root

 |-- id: long (nullable = false)

 |-- dts: string (nullable = true)

 |-- created_at: timestamp (nullable = true)

 |-- unix_ts: long (nullable = true)

 |-- local_hour: integer (nullable = true)

 |-- s2: string (nullable = true)

 |-- s3: timestamp (nullable = true)

 |-- s4: timestamp (nullable = true)

 |-- utc_ts: timestamp (nullable = true)

 |-- est_ts: timestamp (nullable = true)

 |-- cst_ts: timestamp (nullable = true)


import org.apache.spark.sql.Column

stringts_to_tz: (col: org.apache.spark.sql.Column, tz:
String)org.apache.spark.sql.Column

df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]

df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 9 more
fields]


scala>

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Dynamic resource allocation to Spark on Mesos

2017-01-27 Thread Ji Yan
Dear Spark Users,

Currently is there a way to dynamically allocate resources to Spark on
Mesos? Within Spark we can specify the CPU cores, memory before running
job. The way I understand is that the Spark job will not run if the CPU/Mem
requirement is not met. This may lead to decrease in overall utilization of
the cluster. An alternative behavior is to launch the job with the best
resource offer Mesos is able to give. Is this possible with the current
implementation?

Thanks
Ji

-- 
 

The information in this email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be 
taken in reliance on it, is prohibited and may be unlawful.


Re: Making withColumn nullable

2017-01-27 Thread Koert Kuipers
it should be by default nullable except for certain primitives where it
defaults to non-nullable

you can use Option for your return value to indicate nullability.

On Fri, Jan 27, 2017 at 10:32 AM, Ninad Shringarpure 
wrote:

> HI Team,
>
> When I add a column to my data frame using withColumn and assign some
> value, it automatically creates the schema with this column to be not
> nullable.
> My final Hive table schema where I want to insert it has this column to be
> nullable and hence throws an error when I try to save.
>
> Is there a way of making the column I add with withColumn method to be set
> to nullable?
>
> Thanks,
> Ninad
>


Issue with caching

2017-01-27 Thread Anil Langote
Hi All 

I am trying to cache large dataset with storage level memory and sterilization 
with kyro enabled when I run my spark job multiple times I get different 
performance at a times caching dataset spark hangs and takes forever what is 
wrong.

The best time I got is 20 mins and some times with same configuration it takes 
40 mins why this is happening ?

Best Regards,
Anil Langote
+1-425-633-9747

Making withColumn nullable

2017-01-27 Thread Ninad Shringarpure
HI Team,

When I add a column to my data frame using withColumn and assign some
value, it automatically creates the schema with this column to be not
nullable.
My final Hive table schema where I want to insert it has this column to be
nullable and hence throws an error when I try to save.

Is there a way of making the column I add with withColumn method to be set
to nullable?

Thanks,
Ninad


Re: Text

2017-01-27 Thread Jörn Franke
Sorry the message was not complete: the key is the file position, so if you 
sort by key the lines will be in the same order as in the original file 

> On 27 Jan 2017, at 14:45, Jörn Franke  wrote:
> 
> I agree with the previous statements. You cannot expect any ordering 
> guarantee. This means you need to ensure that the same ordering is done as 
> the original file. Internally Spark is using the Hadoop Client libraries - 
> even if you do not have Hadoop installed, because it is a flexible 
> transparent solution to access many file systems including the local one. In 
> the case you mentioned it is the TextInputFileFormat that returns a key and 
> the value. The key i
> This means you can sort by the key.
> However to access this key you must use the hadoopFile method of Sparl 
> together with the TextInputFormat.
> 
>> On 27 Jan 2017, at 10:44, Soheila S.  wrote:
>> 
>> Hi All,
>> I read a test file using sparkContext.textfile(filename) and assign it to an 
>> RDD and process the RDD (replace some words) and finally write it to a text 
>> file using rdd.saveAsTextFile(output).
>> Is there any way to be sure the order of the sentences will not be changed? 
>> I need to have the same text with some corrected words.
>> 
>> thanks!
>> 
>> Soheila

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Text

2017-01-27 Thread Jörn Franke
I agree with the previous statements. You cannot expect any ordering guarantee. 
This means you need to ensure that the same ordering is done as the original 
file. Internally Spark is using the Hadoop Client libraries - even if you do 
not have Hadoop installed, because it is a flexible transparent solution to 
access many file systems including the local one. In the case you mentioned it 
is the TextInputFileFormat that returns a key and the value. The key i
This means you can sort by the key.
However to access this key you must use the hadoopFile method of Sparl together 
with the TextInputFormat.

> On 27 Jan 2017, at 10:44, Soheila S.  wrote:
> 
> Hi All,
> I read a test file using sparkContext.textfile(filename) and assign it to an 
> RDD and process the RDD (replace some words) and finally write it to a text 
> file using rdd.saveAsTextFile(output).
> Is there any way to be sure the order of the sentences will not be changed? I 
> need to have the same text with some corrected words.
> 
> thanks!
> 
> Soheila

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Text

2017-01-27 Thread ayan guha
I would not count on order preserving nature of the operations, because it
is not guranteed. I would assign some order to the sentences and sort at
the end before write back

On Fri, 27 Jan 2017 at 10:59 pm, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Some operations like map, filter, flatMap and coalesce (with
> shuffle=false) usually preserve the order. However, sortBy, reduceBy,
> partitionBy, join etc. do not.
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>
> On 27 January 2017 at 09:44, Soheila S.  wrote:
>
> Hi All,
> I read a test file using sparkContext.textfile(filename) and assign it to
> an RDD and process the RDD (replace some words) and finally write it to
> a text file using rdd.saveAsTextFile(output).
> Is there any way to be sure the order of the sentences will not be
> changed? I need to have the same text with some corrected words.
>
> thanks!
>
> Soheila
>
>
> --
Best Regards,
Ayan Guha


Re: Kinesis streaming misunderstanding..?

2017-01-27 Thread Graham Clark
Hi - thanks for the responses. You are right that I started by copying the
word-counting example. I assumed that this would help spread the load
evenly across the cluster, with each worker receiving a portion of the
stream data - corresponding to one shard's worth - and then keeping the
data local until something invoked a shuffle. I did check that the data
isn't skewed in Kinesis - it seems to be pretty well randomly distributed
across the shards in the stream. In reducing the code to this example, I
hoped to show that a simple calculation would be parallelized. But it
doesn't seem to be! Every RDD count is run on the same executor, as far as
I can see. It's such a small example that the simplest explanation to me is
that I misunderstood something :-( I didn't mention that I am running with
"--master yarn", though as far as I know, nothing has changed from the yarn
defaults Cloudera provides.

Graham






On Fri, Jan 27, 2017 at 4:48 AM, Takeshi Yamamuro 
wrote:

> Probably, he referred to the word-couting example in kinesis here:
> https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/
> scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L114
>
>
> On Fri, Jan 27, 2017 at 6:41 PM, ayan guha  wrote:
>
>> Maybe a naive question: why are you creating 1 Dstream per shard? It
>> should be one Dstream corresponding to kinesis stream, isn't it?
>>
>> On Fri, Jan 27, 2017 at 8:09 PM, Takeshi Yamamuro 
>> wrote:
>>
>>> Hi,
>>>
>>> Just a guess though, Kinesis shards sometimes have skew data.
>>> So, before you compute something from kinesis RDDs, you'd be better to
>>> repartition them
>>> for better parallelism.
>>>
>>> // maropu
>>>
>>> On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark  wrote:
>>>
 Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera)
 to read information from Kinesis and write it to HDFS in parquet format.
 The write seems very slow, and if I understood Spark's diagnostics
 correctly, always seemed to run from the same executor, one partition after
 the other, serially. So I stripped the program down to this:


 val kinesisStreams = (0 until numShards).map { i => {

   KinesisUtils.createStream(streamingContext, sparkApplicationName,

 kinesisStreamName, kinesisUrl, awsRegion,
 InitialPositionInStream.LATEST)

 new Duration(streamingInterval.millis),
 StorageLevel.MEMORY_AND_DISK_SER,

 awsCredentials.accessKey, awsCredentials.secretKey)

 }}

 val allKinesisStreams = streamingContext.union(kinesisStreams)

 allKinesisStreams.foreachRDD {

rdd => {

   info("total for this batch is " + rdd.count())

}
 }

 The Kinesis stream has 20 shards (overprovisioned for this small test).
 I confirmed using a small boto program that data is periodically written to
 all 20 of the shards. I can see that Spark has created 20 executors, one
 for each Kinesis shard. It also creates one other executor, tied to a
 particular worker node, and that node seems to do the RDD counting. The
 streaming interval is 1 minute, during which time several shards have
 received data. Each minute interval, for this particular example, the
 driver prints out between 20 and 30 for the count value. I expected to see
 the count operation parallelized across the cluster. I think I must just be
 misunderstanding something fundamental! Can anyone point out where I'm
 going wrong?

 Yours in confusion,
 Graham


>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Text

2017-01-27 Thread Md. Rezaul Karim
Some operations like map, filter, flatMap and coalesce (with shuffle=false)
usually preserve the order. However, sortBy, reduceBy, partitionBy, join
etc. do not.

Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html


On 27 January 2017 at 09:44, Soheila S.  wrote:

> Hi All,
> I read a test file using sparkContext.textfile(filename) and assign it to
> an RDD and process the RDD (replace some words) and finally write it to
> a text file using rdd.saveAsTextFile(output).
> Is there any way to be sure the order of the sentences will not be
> changed? I need to have the same text with some corrected words.
>
> thanks!
>
> Soheila
>


Re: spark 2.02 error when writing to s3

2017-01-27 Thread Steve Loughran
OK

Nobody should be committing output directly to S3 without having something add 
a consistency layer on top, not if you want reliabie (as in "doesn't 
lose/corrupt data" reliable) work

On 26 Jan 2017, at 19:09, VND Tremblay, Paul 
> wrote:

This seems to have done the trick, although I am not positive. If I have time, 
I'll test spinning up a cluster with and without consistent view to pin point 
the error.

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Neil Jonkers [mailto:neilod...@gmail.com]
Sent: Friday, January 20, 2017 11:39 AM
To: Steve Loughran; VND Tremblay, Paul
Cc: Takeshi Yamamuro; user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Can you test by enabling emrfs consistent view and use s3:// uri.

http://docs.aws.amazon.com/emr/latest/ManagementGuide/enable-consistent-view.html

 Original message 
From: Steve Loughran
Date:20/01/2017 21:17 (GMT+02:00)
To: "VND Tremblay, Paul"
Cc: Takeshi Yamamuro ,user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

AWS S3 is eventually consistent: even after something is deleted, a LIST/GET 
call may show it. You may be seeing that effect; even after the DELETE has got 
rid of the files, a listing sees something there, And I suspect the time it 
takes for the listing to "go away" will depend on the total number of entries 
underneath, as there are more deletion markers "tombstones" to propagate around 
s3

Try deleting the path and then waiting a short period


On 20 Jan 2017, at 18:54, VND Tremblay, Paul 
> wrote:

I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.

Thanks

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_


From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul 
> wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv



My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don't get the error

* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any transformations)

* If I do the previous step with a smaller set of data, I don't get the error.

* I am using pyspark, with python 2.7

* The thread at this link: 
https://forums.aws.amazon.com/thread.jspa?threadID=152470  Indicates the 
problem is caused by a problem sync problem. With large datasets, spark tries 
to write multiple times and causes the error. The suggestion is to turn off 
speculation, but I believe speculation is turned off by default in pyspark.

Thanks!

Paul


_

Paul Tremblay
Analytics 

Re: Kinesis streaming misunderstanding..?

2017-01-27 Thread Takeshi Yamamuro
Probably, he referred to the word-couting example in kinesis here:
https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L114


On Fri, Jan 27, 2017 at 6:41 PM, ayan guha  wrote:

> Maybe a naive question: why are you creating 1 Dstream per shard? It
> should be one Dstream corresponding to kinesis stream, isn't it?
>
> On Fri, Jan 27, 2017 at 8:09 PM, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> Just a guess though, Kinesis shards sometimes have skew data.
>> So, before you compute something from kinesis RDDs, you'd be better to
>> repartition them
>> for better parallelism.
>>
>> // maropu
>>
>> On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark  wrote:
>>
>>> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera)
>>> to read information from Kinesis and write it to HDFS in parquet format.
>>> The write seems very slow, and if I understood Spark's diagnostics
>>> correctly, always seemed to run from the same executor, one partition after
>>> the other, serially. So I stripped the program down to this:
>>>
>>>
>>> val kinesisStreams = (0 until numShards).map { i => {
>>>
>>>   KinesisUtils.createStream(streamingContext, sparkApplicationName,
>>>
>>> kinesisStreamName, kinesisUrl, awsRegion,
>>> InitialPositionInStream.LATEST)
>>>
>>> new Duration(streamingInterval.millis),
>>> StorageLevel.MEMORY_AND_DISK_SER,
>>>
>>> awsCredentials.accessKey, awsCredentials.secretKey)
>>>
>>> }}
>>>
>>> val allKinesisStreams = streamingContext.union(kinesisStreams)
>>>
>>> allKinesisStreams.foreachRDD {
>>>
>>>rdd => {
>>>
>>>   info("total for this batch is " + rdd.count())
>>>
>>>}
>>> }
>>>
>>> The Kinesis stream has 20 shards (overprovisioned for this small test).
>>> I confirmed using a small boto program that data is periodically written to
>>> all 20 of the shards. I can see that Spark has created 20 executors, one
>>> for each Kinesis shard. It also creates one other executor, tied to a
>>> particular worker node, and that node seems to do the RDD counting. The
>>> streaming interval is 1 minute, during which time several shards have
>>> received data. Each minute interval, for this particular example, the
>>> driver prints out between 20 and 30 for the count value. I expected to see
>>> the count operation parallelized across the cluster. I think I must just be
>>> misunderstanding something fundamental! Can anyone point out where I'm
>>> going wrong?
>>>
>>> Yours in confusion,
>>> Graham
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
---
Takeshi Yamamuro


Text

2017-01-27 Thread Soheila S.
Hi All,
I read a test file using sparkContext.textfile(filename) and assign it to
an RDD and process the RDD (replace some words) and finally write it to
a text file using rdd.saveAsTextFile(output).
Is there any way to be sure the order of the sentences will not be changed?
I need to have the same text with some corrected words.

thanks!

Soheila


Re: Kinesis streaming misunderstanding..?

2017-01-27 Thread ayan guha
Maybe a naive question: why are you creating 1 Dstream per shard? It should
be one Dstream corresponding to kinesis stream, isn't it?

On Fri, Jan 27, 2017 at 8:09 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> Just a guess though, Kinesis shards sometimes have skew data.
> So, before you compute something from kinesis RDDs, you'd be better to
> repartition them
> for better parallelism.
>
> // maropu
>
> On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark  wrote:
>
>> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera)
>> to read information from Kinesis and write it to HDFS in parquet format.
>> The write seems very slow, and if I understood Spark's diagnostics
>> correctly, always seemed to run from the same executor, one partition after
>> the other, serially. So I stripped the program down to this:
>>
>>
>> val kinesisStreams = (0 until numShards).map { i => {
>>
>>   KinesisUtils.createStream(streamingContext, sparkApplicationName,
>>
>> kinesisStreamName, kinesisUrl, awsRegion,
>> InitialPositionInStream.LATEST)
>>
>> new Duration(streamingInterval.millis),
>> StorageLevel.MEMORY_AND_DISK_SER,
>>
>> awsCredentials.accessKey, awsCredentials.secretKey)
>>
>> }}
>>
>> val allKinesisStreams = streamingContext.union(kinesisStreams)
>>
>> allKinesisStreams.foreachRDD {
>>
>>rdd => {
>>
>>   info("total for this batch is " + rdd.count())
>>
>>}
>> }
>>
>> The Kinesis stream has 20 shards (overprovisioned for this small test). I
>> confirmed using a small boto program that data is periodically written to
>> all 20 of the shards. I can see that Spark has created 20 executors, one
>> for each Kinesis shard. It also creates one other executor, tied to a
>> particular worker node, and that node seems to do the RDD counting. The
>> streaming interval is 1 minute, during which time several shards have
>> received data. Each minute interval, for this particular example, the
>> driver prints out between 20 and 30 for the count value. I expected to see
>> the count operation parallelized across the cluster. I think I must just be
>> misunderstanding something fundamental! Can anyone point out where I'm
>> going wrong?
>>
>> Yours in confusion,
>> Graham
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
Best Regards,
Ayan Guha


Re: Kinesis streaming misunderstanding..?

2017-01-27 Thread Takeshi Yamamuro
Hi,

Just a guess though, Kinesis shards sometimes have skew data.
So, before you compute something from kinesis RDDs, you'd be better to
repartition them
for better parallelism.

// maropu

On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark  wrote:

> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera) to
> read information from Kinesis and write it to HDFS in parquet format. The
> write seems very slow, and if I understood Spark's diagnostics correctly,
> always seemed to run from the same executor, one partition after the other,
> serially. So I stripped the program down to this:
>
>
> val kinesisStreams = (0 until numShards).map { i => {
>
>   KinesisUtils.createStream(streamingContext, sparkApplicationName,
>
> kinesisStreamName, kinesisUrl, awsRegion, InitialPositionInStream.
> LATEST)
>
> new Duration(streamingInterval.millis), StorageLevel.MEMORY_AND_DISK_
> SER,
>
> awsCredentials.accessKey, awsCredentials.secretKey)
>
> }}
>
> val allKinesisStreams = streamingContext.union(kinesisStreams)
>
> allKinesisStreams.foreachRDD {
>
>rdd => {
>
>   info("total for this batch is " + rdd.count())
>
>}
> }
>
> The Kinesis stream has 20 shards (overprovisioned for this small test). I
> confirmed using a small boto program that data is periodically written to
> all 20 of the shards. I can see that Spark has created 20 executors, one
> for each Kinesis shard. It also creates one other executor, tied to a
> particular worker node, and that node seems to do the RDD counting. The
> streaming interval is 1 minute, during which time several shards have
> received data. Each minute interval, for this particular example, the
> driver prints out between 20 and 30 for the count value. I expected to see
> the count operation parallelized across the cluster. I think I must just be
> misunderstanding something fundamental! Can anyone point out where I'm
> going wrong?
>
> Yours in confusion,
> Graham
>
>


-- 
---
Takeshi Yamamuro


Re: spark intermediate data fills up the disk

2017-01-27 Thread Takeshi Yamamuro
IIUC, if the references of RDDs have gone, the related files (e.g.,
shuffled data) of these
RDDs are automatically removed by `ContextCleaner` (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L178
).
Since spark can recompute from datasources (this is a fundamental concept
of RDDs), it seems removing these files directly results in failed jobs.
Though, I think removing them by yourself is a smarter way.

I'm not exactly sure about your query in the streaming though, I think your
query might
cause this situation you described.


On Fri, Jan 27, 2017 at 1:48 PM,  wrote:

> Hi!
>
> Yes these files are for shuffle blocks however they need to be cleaned as
> well right? I had been running a streaming application for 2 days. The
> third day my disk fills up with all .index and .data files and my
> assumption is that these files had been there since the start of my
> streaming application I should have checked the time stamp before doing rm
> -rf. Please let me know if I am wrong
>
> Sent from my iPhone
>
> On Jan 26, 2017, at 4:24 PM, Takeshi Yamamuro 
> wrote:
>
> Yea, I think so and they are the intermediate files for shuffling.
> Probably, kant checked the configuration here (
> http://spark.apache.org/docs/latest/spark-standalone.html) though, this
> is not related to the issue.
>
> // maropu
>
> On Fri, Jan 27, 2017 at 7:46 AM, Jacek Laskowski  wrote:
>
>> Hi,
>>
>> The files are for shuffle blocks. Where did you find the docs about them?
>>
>> Jacek
>>
>> On 25 Jan 2017 8:41 p.m., "kant kodali"  wrote:
>>
>> oh sorry its actually in the documentation. I should just
>> set spark.worker.cleanup.enabled = true
>>
>> On Wed, Jan 25, 2017 at 11:30 AM, kant kodali  wrote:
>>
>>> I have bunch of .index and .data files like that fills up my disk. I am
>>> not sure what the fix is? I am running spark 2.0.2 in stand alone mode
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>
>


-- 
---
Takeshi Yamamuro


Re: outdated documentation? SparkSession

2017-01-27 Thread Chetan Khatri
Not outdated at all, because there are other methods having dependencies on
sparkcontext so you have to create it.
For example,
https://gist.github.com/chetkhatri/f75c2b743e6cb2d7066188687448c5a1

On Fri, Jan 27, 2017 at 2:06 PM, Wojciech Indyk 
wrote:

> Hi!
> In this doc http://spark.apache.org/docs/latest/programming-guide.html#
> initializing-spark initialization is described by SparkContext. Do you
> think is it reasonable to change it to SparkSession or just mentioned it at
> the end? I can prepare it and make PR for this, but want to know your
> opinion at first. The same for quickstart: http://spark.
> apache.org/docs/latest/quick-start.html#self-contained-applications
>
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk
>


outdated documentation? SparkSession

2017-01-27 Thread Wojciech Indyk
Hi!
In this doc
http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark
initialization is described by SparkContext. Do you think is it reasonable
to change it to SparkSession or just mentioned it at the end? I can prepare
it and make PR for this, but want to know your opinion at first. The same
for quickstart:
http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications

--
Kind regards/ Pozdrawiam,
Wojciech Indyk