Question about In-Memory size (cache / cacheTable)

2016-10-26 Thread Prithish
Hello,

I am trying to understand how in-memory size is changing in these
situations. Specifically, why is in-memory size much higher for avro and
parquet? Are there any optimizations necessary to reduce this?

Used cacheTable on each of these:

AVRO File (600kb) - In-memory size was 12mb
Parquet File (600kb) - In-memory size was 12mb
CSV File (3mb, was the same file as above) - In-memory size was 600Kb

Because of this, we need a cluster with a much bigger memory if we were to
cache the avro files.

Thanks for your help.

Prit


Re: csv date/timestamp type inference in spark 2.0.1

2016-10-26 Thread Hyukjin Kwon
Hi Koert,


I am curious about your case. I guess the purpose of timestampFormat and
dateFormat is to infer timestamps/dates when parsing/inferring

but not to exclude the type inference/parsing. Actually, it does try to
infer/parse in 2.0.0 as well (but it fails) so actually I guess there
wouldn't be a big performance difference.


I guess it is type inference and therefore it is the right behaviour that
it tries to do its best to infer the appropriate type inclusively.

Why don't you just cast the timestamps to strings?


Thanks.


2016-10-27 9:47 GMT+09:00 Koert Kuipers :

> i tried setting both dateFormat and timestampFormat to impossible values
> (e.g. "~|.G~z~a|wW") and it still detected my data to be TimestampType
>
> On Wed, Oct 26, 2016 at 1:15 PM, Koert Kuipers  wrote:
>
>> we had the inference of dates/timestamps when reading csv files disabled
>> in spark 2.0.0 by always setting dateFormat to something impossible (e.g.
>> dateFormat "~|.G~z~a|wW")
>>
>> i noticed in spark 2.0.1 that setting this impossible dateFormat does not
>> stop spark from inferring it is a date or timestamp type anyhow. is this
>> intentional? how do i disable inference of datetype/timestamp type now?
>>
>> thanks! koert
>>
>>
>


Re: spark infers date to be timestamp type

2016-10-26 Thread Hyukjin Kwon
Hi Koert,


Sorry, I thought you meant this is a regression between 2.0.0 and 2.0.1. I
just checked It has not been supporting to infer DateType before[1].

Yes, it only supports to infer such data as timestamps currently.


[1]
https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala#L85-L92




2016-10-27 9:12 GMT+09:00 Anand Viswanathan :

> Hi,
>
> you can use the customSchema(for DateType) and specify dateFormat in
> .option().
> or
> at spark dataframe side, you can convert the timestamp to date using cast
> to the column.
>
> Thanks and regards,
> Anand Viswanathan
>
> On Oct 26, 2016, at 8:07 PM, Koert Kuipers  wrote:
>
> hey,
> i create a file called test.csv with contents:
> date
> 2015-01-01
> 2016-03-05
>
> next i run this code in spark 2.0.1:
> spark.read
>   .format("csv")
>   .option("header", true)
>   .option("inferSchema", true)
>   .load("test.csv")
>   .printSchema
>
> the result is:
> root
>  |-- date: timestamp (nullable = true)
>
>
> On Wed, Oct 26, 2016 at 7:35 PM, Hyukjin Kwon  wrote:
>
>> There are now timestampFormat for TimestampType and dateFormat for
>> DateType.
>>
>> Do you mind if I ask to share your codes?
>>
>> On 27 Oct 2016 2:16 a.m., "Koert Kuipers"  wrote:
>>
>>> is there a reason a column with dates in format -mm-dd in a csv file
>>> is inferred to be TimestampType and not DateType?
>>>
>>> thanks! koert
>>>
>>
>
>


Re: Zero Data Loss in Spark with Kafka

2016-10-26 Thread Cody Koeninger
Honestly, I would stay far away from saving offsets in Zookeeper if at
all possible. It's better to store them alongside your results.

On Wed, Oct 26, 2016 at 10:44 AM, Sunita Arvind  wrote:
> This is enough to get it to work:
>
> df.save(conf.getString("ParquetOutputPath")+offsetSaved, "parquet",
> SaveMode.Overwrite)
>
> And tests so far (in local env) seem good with the edits. Yet to test on the
> cluster. Cody, appreciate your thoughts on the edits.
>
> Just want to make sure I am not doing an overkill or overseeing a potential
> issue.
>
> regards
>
> Sunita
>
>
> On Tue, Oct 25, 2016 at 2:38 PM, Sunita Arvind 
> wrote:
>>
>> The error in the file I just shared is here:
>>
>> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
>> partition._2(0);  --> this was just partition and hence there was an error
>>
>> fetching the offset.
>>
>> Still testing. Somehow Cody, your code never lead to file already exists
>> sort of errors (I am saving the output of the dstream
>> as parquet file, after converting it to a dataframe. The batch interval
>> will be 2 hrs)
>>
>> The code in the main is here:
>>
>>   val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
>> conf.getString("groupId"), conf.getString("topics"))
>>val storedOffsets = offsetsStore.readOffsets()
>>  LogHandler.log.info("Fetched the offset from zookeeper")
>>
>>  val kafkaArr =  storedOffsets match {
>>case None =>
>>  // start from the initial offsets
>>
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>> kafkaProps, Set(topics))
>>
>>case Some(fromOffsets) =>
>>  // start from previously saved offsets
>>  val messageHandler: MessageAndMetadata[String, Array[Byte]] =>
>> (String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) =>
>> (mmd.key, mmd.message)
>>
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
>> Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)
>>
>>  //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage,
>> (String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
>>  }
>>
>>  kafkaArr.foreachRDD{ (rdd,time) =>
>>
>> val schema =
>> SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
>> val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r =>
>> Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
>> }
>> val df = sql.createDataFrame(ardd,schema)
>>LogHandler.log.info("Created dataframe")
>>val offsetSaved =
>> offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
>>LogHandler.log.info("Saved offset to Zookeeper")
>>df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
>>LogHandler.log.info("Created the parquet file")
>>  }
>>
>> Thanks
>>
>> Sunita
>>
>>
>>
>>
>>
>> On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind 
>> wrote:
>>>
>>> Attached is the edited code. Am I heading in right direction? Also, I am
>>> missing something due to which, it seems to work well as long as the
>>> application is running and the files are created right. But as soon as I
>>> restart the application, it goes back to fromOffset as 0. Any thoughts?
>>>
>>> regards
>>> Sunita
>>>
>>> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind 
>>> wrote:

 Thanks for confirming Cody.
 To get to use the library, I had to do:

 val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
 "/consumers/topics/"+ topics + "/0")

 It worked well. However, I had to specify the partitionId in the zkPath.
 If I want the library to pick all the partitions for a topic, without me
 specifying the path, is it possible out of the box or I need to tweak?

 regards
 Sunita


 On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger 
 wrote:
>
> You are correct that you shouldn't have to worry about broker id.
>
> I'm honestly not sure specifically what else you are asking at this
> point.
>
> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind 
> wrote:
> > Just re-read the kafka architecture. Something that slipped my mind
> > is, it
> > is leader based. So topic/partitionId pair will be same on all the
> > brokers.
> > So we do not need to consider brokerid while storing offsets. Still
> > exploring rest of the items.
> > regards
> > Sunita
> >
> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind
> > 
> > wrote:
> >>
> >> Hello Experts,
> >>
> >> I am trying to use the saving to ZK design. Just saw Sudhir's
> >> comments
> >> that it is old approach. Any reasons for that? Any issues observed
> >> with
> >> saving to ZK. 

Re: Cogrouping or joining datasets by rownum

2016-10-26 Thread Rohit Verma
The formatting of message got disturbed so sending it again


On Oct 27, 2016, at 8:52 AM, Rohit Verma 
> wrote:

Does anyone tried how to cogroup datasets / join datasets by row num.


DS1









d1

d2







40

AA







41

BB







42

CC







43

DD

















DS2









s1

s2







IN

INDIA







AU

Australia

















joined









rowNum

d1

d2

s1

s2

1

40

AA

IN

INDIA

2

41

BB

AU

Australia

3

42

CC

null or empty

null or empty

4

43

DD

null or empty

null or empty


I don’t expect a complete code, some pointers on how to do is sufficient.

I tried row_number function to start

spark.range(100,200).withColumn("id",row_number()).show();

but its throwing error

java.lang.UnsupportedOperationException: Cannot evaluate expression: rownumber()

Thanks
Rohit



Cogrouping or joining datasets by rownum

2016-10-26 Thread Rohit Verma
Does anyone tried how to cogroup datasets / join datasets by row num.

e.g
DS 1

43 AA
44 BB
45 CB

DS2

IN india
AU australia


i want to get

rownum   ds1.1 ds1.2   ds2.1 ds2.2

1 43 AA IN india
2 44 BB AU australia
3 45 CB null null

I don’t expect a complete code, some pointers on how to do is sufficient.

I tried row_number function to start

spark.range(100,200).withColumn("id",row_number()).show();

but its throwing error

java.lang.UnsupportedOperationException: Cannot evaluate expression: rownumber()

Thanks
Rohit


Re: Dataframe schema...

2016-10-26 Thread Michael Armbrust
On Fri, Oct 21, 2016 at 8:40 PM, Koert Kuipers  wrote:

> This rather innocent looking optimization flag nullable has caused a lot
> of bugs... Makes me wonder if we are better off without it
>

Yes... my most regretted design decision :(

Please give thoughts here: https://issues.apache.org/jira/browse/SPARK-17939


Re: csv date/timestamp type inference in spark 2.0.1

2016-10-26 Thread Koert Kuipers
i tried setting both dateFormat and timestampFormat to impossible values
(e.g. "~|.G~z~a|wW") and it still detected my data to be TimestampType

On Wed, Oct 26, 2016 at 1:15 PM, Koert Kuipers  wrote:

> we had the inference of dates/timestamps when reading csv files disabled
> in spark 2.0.0 by always setting dateFormat to something impossible (e.g.
> dateFormat "~|.G~z~a|wW")
>
> i noticed in spark 2.0.1 that setting this impossible dateFormat does not
> stop spark from inferring it is a date or timestamp type anyhow. is this
> intentional? how do i disable inference of datetype/timestamp type now?
>
> thanks! koert
>
>


Re: spark infers date to be timestamp type

2016-10-26 Thread Anand Viswanathan
Hi,

you can use the customSchema(for DateType) and specify dateFormat in .option().
or 
at spark dataframe side, you can convert the timestamp to date using cast to 
the column.

Thanks and regards,
Anand Viswanathan

> On Oct 26, 2016, at 8:07 PM, Koert Kuipers  wrote:
> 
> hey,
> i create a file called test.csv with contents:
> date
> 2015-01-01
> 2016-03-05
> 
> next i run this code in spark 2.0.1:
> spark.read
>   .format("csv")
>   .option("header", true)
>   .option("inferSchema", true)
>   .load("test.csv")
>   .printSchema
> 
> the result is:
> root
>  |-- date: timestamp (nullable = true)
> 
> 
> On Wed, Oct 26, 2016 at 7:35 PM, Hyukjin Kwon  > wrote:
> There are now timestampFormat for TimestampType and dateFormat for DateType.
> 
> Do you mind if I ask to share your codes?
> 
> 
> On 27 Oct 2016 2:16 a.m., "Koert Kuipers"  > wrote:
> is there a reason a column with dates in format -mm-dd in a csv file is 
> inferred to be TimestampType and not DateType?
> 
> thanks! koert
> 



Re: spark infers date to be timestamp type

2016-10-26 Thread Koert Kuipers
hey,
i create a file called test.csv with contents:
date
2015-01-01
2016-03-05

next i run this code in spark 2.0.1:
spark.read
  .format("csv")
  .option("header", true)
  .option("inferSchema", true)
  .load("test.csv")
  .printSchema

the result is:
root
 |-- date: timestamp (nullable = true)


On Wed, Oct 26, 2016 at 7:35 PM, Hyukjin Kwon  wrote:

> There are now timestampFormat for TimestampType and dateFormat for
> DateType.
>
> Do you mind if I ask to share your codes?
>
> On 27 Oct 2016 2:16 a.m., "Koert Kuipers"  wrote:
>
>> is there a reason a column with dates in format -mm-dd in a csv file
>> is inferred to be TimestampType and not DateType?
>>
>> thanks! koert
>>
>


Re: spark infers date to be timestamp type

2016-10-26 Thread Hyukjin Kwon
There are now timestampFormat for TimestampType and dateFormat for DateType.

Do you mind if I ask to share your codes?

On 27 Oct 2016 2:16 a.m., "Koert Kuipers"  wrote:

> is there a reason a column with dates in format -mm-dd in a csv file
> is inferred to be TimestampType and not DateType?
>
> thanks! koert
>


Re: [Spark 2.0.1] Error in generated code, possible regression?

2016-10-26 Thread Michael Armbrust
I think that there should be comments that show the expressions that are
getting compiled.  Maybe make a gist with the whole generated code fragment?

On Wed, Oct 26, 2016 at 3:45 PM, Efe Selcuk  wrote:

> I do plan to do that Michael. Do you happen to know of any guidelines for
> tracking down the context of this generated code?
>
> On Wed, Oct 26, 2016 at 3:42 PM Michael Armbrust 
> wrote:
>
>> If you have a reproduction you can post for this, it would be great if
>> you could open a JIRA.
>>
>> On Mon, Oct 24, 2016 at 6:21 PM, Efe Selcuk  wrote:
>>
>> I have an application that works in 2.0.0 but has been dying at runtime
>> on the 2.0.1 distribution.
>>
>> at org.apache.spark.sql.catalyst.expressions.codegen.
>> CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$
>> CodeGenerator$$doCompile(CodeGenerator.scala:893)
>> at org.apache.spark.sql.catalyst.expressions.codegen.
>> CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
>> at org.apache.spark.sql.catalyst.expressions.codegen.
>> CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
>> at org.spark_project.guava.cache.LocalCache$LoadingValueReference.
>> loadFuture(LocalCache.java:3599)
>> at org.spark_project.guava.cache.LocalCache$Segment.loadSync(
>> LocalCache.java:2379)
>> ... 30 more
>> Caused by: org.codehaus.commons.compiler.CompileException: File
>> 'generated.java', Line 74, Column 145: Unknown variable or type "value4"
>>
>> It also includes a massive 1800-line generated code output (which repeats
>> over and over, even on 1 thread, which makes this a pain), but fortunately
>> the error occurs early so I can give at least some context.
>>
>> /* 001 */ public java.lang.Object generate(Object[] references) {
>> /* 002 */   return new SpecificMutableProjection(references);
>> /* 003 */ }
>> /* 004 */
>> /* 005 */ class SpecificMutableProjection extends
>> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>> /* 006 */
>> /* 007 */   private Object[] references;
>> /* 008 */   private MutableRow mutableRow;
>> /* 009 */   private Object[] values;
>> ... // many lines of class variables, mostly errMsg strings and Object[]
>> /* 071 */   private void apply2_7(InternalRow i) {
>> /* 072 */
>> /* 073 */ boolean isNull215 = false;
>> /* 074 */ final com.mypackage.MyThing value215 = isNull215 ? null :
>> (com.mypackage.MyThing) value4._2();
>> /* 075 */ isNull215 = value215 == null;
>> /* 076 */
>> ...
>>
>> As you can see, on line 74 there's a reference to value4 but nothing
>> called value4 has been defined. I have no idea of where to even begin
>> looking for what caused this, or even whether it's my fault or a bug in the
>> code generation. Any help is appreciated.
>>
>> Efe
>>
>>
>>


Re: [Spark 2.0.1] Error in generated code, possible regression?

2016-10-26 Thread Efe Selcuk
I do plan to do that Michael. Do you happen to know of any guidelines for
tracking down the context of this generated code?

On Wed, Oct 26, 2016 at 3:42 PM Michael Armbrust 
wrote:

> If you have a reproduction you can post for this, it would be great if you
> could open a JIRA.
>
> On Mon, Oct 24, 2016 at 6:21 PM, Efe Selcuk  wrote:
>
> I have an application that works in 2.0.0 but has been dying at runtime on
> the 2.0.1 distribution.
>
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
> at
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
> at
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
> ... 30 more
> Caused by: org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 74, Column 145: Unknown variable or type "value4"
>
> It also includes a massive 1800-line generated code output (which repeats
> over and over, even on 1 thread, which makes this a pain), but fortunately
> the error occurs early so I can give at least some context.
>
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */   return new SpecificMutableProjection(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificMutableProjection extends
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
> /* 006 */
> /* 007 */   private Object[] references;
> /* 008 */   private MutableRow mutableRow;
> /* 009 */   private Object[] values;
> ... // many lines of class variables, mostly errMsg strings and Object[]
> /* 071 */   private void apply2_7(InternalRow i) {
> /* 072 */
> /* 073 */ boolean isNull215 = false;
> /* 074 */ final com.mypackage.MyThing value215 = isNull215 ? null :
> (com.mypackage.MyThing) value4._2();
> /* 075 */ isNull215 = value215 == null;
> /* 076 */
> ...
>
> As you can see, on line 74 there's a reference to value4 but nothing
> called value4 has been defined. I have no idea of where to even begin
> looking for what caused this, or even whether it's my fault or a bug in the
> code generation. Any help is appreciated.
>
> Efe
>
>
>


Re: [Spark 2.0.1] Error in generated code, possible regression?

2016-10-26 Thread Michael Armbrust
If you have a reproduction you can post for this, it would be great if you
could open a JIRA.

On Mon, Oct 24, 2016 at 6:21 PM, Efe Selcuk  wrote:

> I have an application that works in 2.0.0 but has been dying at runtime on
> the 2.0.1 distribution.
>
> at org.apache.spark.sql.catalyst.expressions.codegen.
> CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$
> CodeGenerator$$doCompile(CodeGenerator.scala:893)
> at org.apache.spark.sql.catalyst.expressions.codegen.
> CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
> at org.apache.spark.sql.catalyst.expressions.codegen.
> CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
> at org.spark_project.guava.cache.LocalCache$LoadingValueReference.
> loadFuture(LocalCache.java:3599)
> at org.spark_project.guava.cache.LocalCache$Segment.loadSync(
> LocalCache.java:2379)
> ... 30 more
> Caused by: org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 74, Column 145: Unknown variable or type "value4"
>
> It also includes a massive 1800-line generated code output (which repeats
> over and over, even on 1 thread, which makes this a pain), but fortunately
> the error occurs early so I can give at least some context.
>
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */   return new SpecificMutableProjection(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificMutableProjection extends
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
> /* 006 */
> /* 007 */   private Object[] references;
> /* 008 */   private MutableRow mutableRow;
> /* 009 */   private Object[] values;
> ... // many lines of class variables, mostly errMsg strings and Object[]
> /* 071 */   private void apply2_7(InternalRow i) {
> /* 072 */
> /* 073 */ boolean isNull215 = false;
> /* 074 */ final com.mypackage.MyThing value215 = isNull215 ? null :
> (com.mypackage.MyThing) value4._2();
> /* 075 */ isNull215 = value215 == null;
> /* 076 */
> ...
>
> As you can see, on line 74 there's a reference to value4 but nothing
> called value4 has been defined. I have no idea of where to even begin
> looking for what caused this, or even whether it's my fault or a bug in the
> code generation. Any help is appreciated.
>
> Efe
>
>


Re: Resiliency with SparkStreaming - fileStream

2016-10-26 Thread Michael Armbrust
I'll answer in the context of structured streaming (the new streaming API
build on DataFrames). When reading from files, the FileSource, records
which files are included in each batch inside of the given
checkpointLocation.  If you fail in the middle of a batch, the streaming
engine will retry that batch next time the query is restarted.

If you are concerned about exactly-once semantics, you can get that too.
The FileSink (i.e. using writeStream) writing out to something like parquet
does this automatically.  If you are writing to something like a
transactional database yourself, you can also implement similar
functionality.  Specifically, you can record the partition and version that
are provided by the open method

into
the database in the same transaction that is writing the data.  This way,
when you recover you can avoid writing the same updates more than once.

On Wed, Oct 26, 2016 at 9:20 AM, Scott W  wrote:

> Hello,
>
> I'm planning to use fileStream Spark streaming API to stream data from
> HDFS. My Spark job would essentially process these files and post the
> results to an external endpoint.
>
> *How does fileStream API handle checkpointing of the file it processed ? *In
> other words, if my Spark job failed while posting the results to an
> external endpoint, I want that same original file to be picked up again and
> get reprocessed.
>
> Thanks much!
>


No of partitions in a Dataframe

2016-10-26 Thread Nipun Parasrampuria
How do I find the number of partitions in a dataframe without converting
the dataframe to an RDD(I'm assuming that it's a costly operation).

If there's no way to do so, I wonder why the API doesn't include a method
like that(an explanation for why such a method would be useless, perhaps)

Thanks!
Nipun


Re: Need help with SVM

2016-10-26 Thread Robin East
It looks like the training is over-regularised - dropping the regParam to 0.1 
or 0.01 should resolve the problem.

---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 26 Oct 2016, at 11:05, Aseem Bansal  wrote:
> 
> He replied to me. Forwarding to the mailing list. 
> 
> -- Forwarded message --
> From: Aditya Vyas >
> Date: Tue, Oct 25, 2016 at 8:16 PM
> Subject: Re: Need help with SVM
> To: Aseem Bansal >
> 
> 
> Hello,
> Here is the public 
> gist:https://gist.github.com/aditya1702/760cd5c95a6adf2447347e0b087bc318 
> 
> 
> Do tell if you need more information
> 
> Regards,
> Aditya
> 
> On Tue, Oct 25, 2016 at 8:11 PM, Aseem Bansal  > wrote:
> Is there any labeled point with label 0 in your dataset? 
> 
> On Tue, Oct 25, 2016 at 2:13 AM, aditya1702  > wrote:
> Hello,
> I am using linear SVM to train my model and generate a line through my data.
> However my model always predicts 1 for all the feature examples. Here is my
> code:
> 
> print data_rdd.take(5)
> [LabeledPoint(1.0, [1.9643,4.5957]), LabeledPoint(1.0, [2.2753,3.8589]),
> LabeledPoint(1.0, [2.9781,4.5651]), LabeledPoint(1.0, [2.932,3.5519]),
> LabeledPoint(1.0, [3.5772,2.856])]
> 
> 
> from pyspark.mllib.classification import SVMWithSGD
> from pyspark.mllib.linalg import Vectors
> from sklearn.svm import SVC
> data_rdd=x_df.map(lambda x:LabeledPoint(x[1],x[0]))
> 
> model = SVMWithSGD.train(data_rdd, iterations=1000,regParam=1)
> 
> X=x_df.map(lambda x:x[0]).collect()
> Y=x_df.map(lambda x:x[1]).collect()
> 
> 
> pred=[]
> for i in X:
>   pred.append(model.predict(i))
> print pred
> 
> [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
> 1]
> 
> 
> My dataset is as follows:
>   
> >
> 
> 
> Can someone please help?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Need-help-with-SVM-tp27955.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 
> 
> 



Reading old tweets from twitter in spark

2016-10-26 Thread Cassa L
Hi,
I am using Spark Streaming to read tweets from twitter. It works fine. Now
I want to be able to fetch older tweets in my spark code. Twitter4j has API
to set date
http://twitter4j.org/oldjavadocs/4.0.4/twitter4j/Query.html

Is there a way to set this using TwitterUtils or do I need to write
different code?


Thanks.
LCassa


Spark Metrics monitoring using Graphite

2016-10-26 Thread Sreekanth Jella
Hi All,
I am trying to retrieve the spark metrics using Graphite Exporter. It seems
by default it is exposing the Application ID, but as per the our
requirements we need Application Name.

Sample GraphiteExporter data:
block_manager{application="local-1477496809940",executor_id="driver",instance="
127.0.0.1:9108
",job="spark_graphite_exp",qty="remainingMem_MB",type="memory"}

In above entry, "application" is defaults to ApplicationId. How do I
configure to retrieve the application Name instead of ID.

Thanks,
Sreekanth.


Executor shutdown hook and initialization

2016-10-26 Thread Walter rakoff
Hello,

Is there a way I can add an init() call when an executor is created? I'd
like to initialize a few connections that are part of my singleton object.
Preferably this happens before it runs the first task
On the same line, how can I provide an shutdown hook that cleans up these
connections on termination.

Thanks
Walt


Re: Will Spark SQL completely replace Apache Impala or Apache Hive?

2016-10-26 Thread neil90
No Spark-SQL, is part of Spark which is processing engine.

Apache Hive is a Data Warehouse on top of Hadoop.
Apache Impala is Both DataWarehouse(While Utilizing Hive Metastore) and
processing Engine.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Will-Spark-SQL-completely-replace-Apache-Impala-or-Apache-Hive-tp27958p27963.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark infers date to be timestamp type

2016-10-26 Thread Koert Kuipers
is there a reason a column with dates in format -mm-dd in a csv file is
inferred to be TimestampType and not DateType?

thanks! koert


csv date/timestamp type inference in spark 2.0.1

2016-10-26 Thread Koert Kuipers
we had the inference of dates/timestamps when reading csv files disabled in
spark 2.0.0 by always setting dateFormat to something impossible (e.g.
dateFormat "~|.G~z~a|wW")

i noticed in spark 2.0.1 that setting this impossible dateFormat does not
stop spark from inferring it is a date or timestamp type anyhow. is this
intentional? how do i disable inference of datetype/timestamp type now?

thanks! koert


Resiliency with SparkStreaming - fileStream

2016-10-26 Thread Scott W
Hello,

I'm planning to use fileStream Spark streaming API to stream data from
HDFS. My Spark job would essentially process these files and post the
results to an external endpoint.

*How does fileStream API handle checkpointing of the file it processed ? *In
other words, if my Spark job failed while posting the results to an
external endpoint, I want that same original file to be picked up again and
get reprocessed.

Thanks much!


CSV conversion

2016-10-26 Thread Nathan Kronenfeld
We are finally converting from Spark 1.6 to Spark 2.0, and are finding one
barrier we can't get past.

In the past, we converted CSV RDDs (not files) to DataFrames using
DataBricks SparkCSV library - creating a CsvParser and calling
parser.csvRdd.

The current incarnation of spark-csv seems only to have a CSV file format
exposed, and the only entry points we can find are when reading files.

What is the modern pattern for converting an already-read RDD of CSV lines
into a dataframe?

Thanks,
Nathan Kronenfeld
Uncharted Software


Re: Zero Data Loss in Spark with Kafka

2016-10-26 Thread Sunita Arvind
This is enough to get it to work:

df.save(conf.getString("ParquetOutputPath")+offsetSaved, "parquet",
SaveMode.Overwrite)

And tests so far (in local env) seem good with the edits. Yet to test
on the cluster. Cody, appreciate your thoughts on the edits.

Just want to make sure I am not doing an overkill or overseeing a
potential issue.

regards

Sunita


On Tue, Oct 25, 2016 at 2:38 PM, Sunita Arvind 
wrote:

> The error in the file I just shared is here:
>
> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" + 
> partition._2(0);  --> this was just partition and hence there was an error
>
> fetching the offset.
>
> Still testing. Somehow Cody, your code never lead to file already exists sort 
> of errors (I am saving the output of the dstream
> as parquet file, after converting it to a dataframe. The batch interval will 
> be 2 hrs)
>
> The code in the main is here:
>
>   val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> conf.getString("groupId"), conf.getString("topics"))
>val storedOffsets = offsetsStore.readOffsets()
>  LogHandler.log.info("Fetched the offset from zookeeper")
>
>  val kafkaArr =  storedOffsets match {
>case None =>
>  // start from the initial offsets
>  
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaProps, Set(topics))
>
>case Some(fromOffsets) =>
>  // start from previously saved offsets
>  val messageHandler: MessageAndMetadata[String, Array[Byte]] => (String, 
> Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) => (mmd.key, 
> mmd.message)
>  
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
>  Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)
>
>  //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage, 
> (String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
>  }
>
>  kafkaArr.foreachRDD{ (rdd,time) =>
>
> val schema = 
> SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
> val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r => 
> Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
> }
> val df = sql.createDataFrame(ardd,schema)
>LogHandler.log.info("Created dataframe")
>val offsetSaved =  
> offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
>LogHandler.log.info("Saved offset to Zookeeper")
>df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
>LogHandler.log.info("Created the parquet file")
>  }
>
> Thanks
>
> Sunita
>
>
>
>
>
> On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind 
> wrote:
>
>> Attached is the edited code. Am I heading in right direction? Also, I am
>> missing something due to which, it seems to work well as long as the
>> application is running and the files are created right. But as soon as I
>> restart the application, it goes back to fromOffset as 0. Any thoughts?
>>
>> regards
>> Sunita
>>
>> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind 
>> wrote:
>>
>>> Thanks for confirming Cody.
>>> To get to use the library, I had to do:
>>>
>>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
>>> "/consumers/topics/"+ topics + "/0")
>>>
>>> It worked well. However, I had to specify the partitionId in the
>>> zkPath.  If I want the library to pick all the partitions for a topic,
>>> without me specifying the path, is it possible out of the box or I need to
>>> tweak?
>>>
>>> regards
>>> Sunita
>>>
>>>
>>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger 
>>> wrote:
>>>
 You are correct that you shouldn't have to worry about broker id.

 I'm honestly not sure specifically what else you are asking at this
 point.

 On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind 
 wrote:
 > Just re-read the kafka architecture. Something that slipped my mind
 is, it
 > is leader based. So topic/partitionId pair will be same on all the
 brokers.
 > So we do not need to consider brokerid while storing offsets. Still
 > exploring rest of the items.
 > regards
 > Sunita
 >
 > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <
 sunitarv...@gmail.com>
 > wrote:
 >>
 >> Hello Experts,
 >>
 >> I am trying to use the saving to ZK design. Just saw Sudhir's
 comments
 >> that it is old approach. Any reasons for that? Any issues observed
 with
 >> saving to ZK. The way we are planning to use it is:
 >> 1. Following
 >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
 g-zero-data-loss.html
 >> 2. Saving to the same file with offsetRange as a part of the file.
 We hope
 >> that there are no partial writes/ overwriting is possible and
 offsetRanges
 >>
 >> 

Re: Any Dynamic Compilation of Scala Query

2016-10-26 Thread Vadim Semenov
You can use Cloudera Livy for that https://github.com/cloudera/livy
take a look at this example https://github.com/cloudera/livy#spark-example

On Wed, Oct 26, 2016 at 4:35 AM, Mahender Sarangam <
mahender.bigd...@outlook.com> wrote:

> Hi,
>
> Is there any way to dynamically execute a string  which has scala code
> against spark engine. We are dynamically creating scala file, we would
> like to submit this scala file to Spark, but currently spark accepts
> only JAR file has input from Remote Job submission. Is there any other
> way to submit .SCALA instead of .JAR to REST API of Spark ?
>
> /MS
>
>


Re: Any Dynamic Compilation of Scala Query

2016-10-26 Thread Manjunath, Kiran
Hi,

Can you elaborate with sample example on why you would want to do so?
Ideally there would be a better approach than solving such problems as 
mentioned below.

A sample example would help to understand the problem.

Regards,
Kiran

From: Mahender Sarangam 
Date: Wednesday, October 26, 2016 at 2:05 PM
To: user 
Subject: Any Dynamic Compilation of Scala Query

Hi,

Is there any way to dynamically execute a string  which has scala code
against spark engine. We are dynamically creating scala file, we would
like to submit this scala file to Spark, but currently spark accepts
only JAR file has input from Remote Job submission. Is there any other
way to submit .SCALA instead of .JAR to REST API of Spark ?

/MS




Re: What syntax can be used to specify the latest version of JAR found while using spark submit

2016-10-26 Thread Sudev A C
Hi Aseem,

If you are submitting the jar from a shell you could write a simple bash/sh
script to solve your problem.

`print /home/pathtojarfolder/$(ls -t /home/pathtojarfolder/*.jar | head -n
1)`

The above command can be put in your spark-submit command.

Thanks
Sudev

On Wed, Oct 26, 2016 at 3:33 PM, Aseem Bansal  wrote:

> Hi
>
> Can someone please share their thoughts on http://stackoverflow.com/
> questions/40259022/what-syntax-can-be-used-to-specify-
> the-latest-version-of-jar-found-while-using-s
>


Re: pyspark doesn't recognize MMM dateFormat pattern in spark.read.load() for dates like 1989Dec31 and 31Dec1989

2016-10-26 Thread Pietro Pugni
And what if the month abbreviation is upper-case? Java doesn’t parse the 
month-name, for example if it's “JAN" instead of “Jan” or “DEC” instead of 
“Dec". Is it possible to solve this issue without using UDFs? 

Many thanks again
 Pietro


> Il giorno 24 ott 2016, alle ore 17:33, Pietro Pugni  
> ha scritto:
> 
> This worked without setting other options:
> spark/bin/spark-submit --conf 
> "spark.driver.extraJavaOptions=-Duser.language=en" test.py
> 
> Thank you again!
>  Pietro
> 
>> Il giorno 24 ott 2016, alle ore 17:18, Sean Owen > > ha scritto:
>> 
>> I believe it will be too late to set it there, and these are JVM flags, not 
>> app or Spark flags. See spark.driver.extraJavaOptions and likewise for the 
>> executor.
>> 
>> On Mon, Oct 24, 2016 at 4:04 PM Pietro Pugni > > wrote:
>> Thank you!
>> 
>> I tried again setting locale options in different ways but doesn’t propagate 
>> to the JVM. I tested these strategies (alone and all together):
>> - bin/spark-submit --conf 
>> "spark.executor.extraJavaOptions=-Duser.language=en -Duser.region=US 
>> -Duser.country=US -Duser.timezone=GMT” test.py
>> - spark = SparkSession \
>>  .builder \
>>  .appName("My app") \
>>  .config("spark.executor.extraJavaOptions", "-Duser.language=en 
>> -Duser.region=US -Duser.country=US -Duser.timezone=GMT") \
>>  .config("user.country", "US") \
>>  .config("user.region", "US") \
>>  .config("user.language", "en") \
>>  .config("user.timezone", "GMT") \
>>  .config("-Duser.country", "US") \
>>  .config("-Duser.region", "US") \
>>  .config("-Duser.language", "en") \
>>  .config("-Duser.timezone", "GMT") \
>>  .getOrCreate()
>> - export JAVA_OPTS="-Duser.language=en -Duser.region=US -Duser.country=US 
>> -Duser.timezone=GMT”
>> - export LANG="en_US.UTF-8”
>> 
>> After running export LANG="en_US.UTF-8” from the same terminal session I use 
>> to launch spark-submit, if I run locale command I get correct values:
>> LANG="en_US.UTF-8"
>> LC_COLLATE="en_US.UTF-8"
>> LC_CTYPE="en_US.UTF-8"
>> LC_MESSAGES="en_US.UTF-8"
>> LC_MONETARY="en_US.UTF-8"
>> LC_NUMERIC="en_US.UTF-8"
>> LC_TIME="en_US.UTF-8"
>> LC_ALL=
>> 
>> While running my pyspark script, from the Spark UI,  under Environment -> 
>> Spark Properties the locale appear to be correctly set:
>> - user.country: US
>> - user.language: en
>> - user.region: US
>> - user.timezone: GMT
>> 
>> but Environment -> System Properties still reports the System locale and not 
>> the session locale I previously set:
>> - user.country: IT
>> - user.language: it
>> - user.timezone: Europe/Rome
>> 
>> Am I wrong or the options don’t propagate to the JVM correctly?
>> 
>> 
> 



Fwd: Need help with SVM

2016-10-26 Thread Aseem Bansal
He replied to me. Forwarding to the mailing list.

-- Forwarded message --
From: Aditya Vyas 
Date: Tue, Oct 25, 2016 at 8:16 PM
Subject: Re: Need help with SVM
To: Aseem Bansal 


Hello,
Here is the public gist:https://gist.github.com/a
ditya1702/760cd5c95a6adf2447347e0b087bc318

Do tell if you need more information

Regards,
Aditya

On Tue, Oct 25, 2016 at 8:11 PM, Aseem Bansal  wrote:

> Is there any labeled point with label 0 in your dataset?
>
> On Tue, Oct 25, 2016 at 2:13 AM, aditya1702 
> wrote:
>
>> Hello,
>> I am using linear SVM to train my model and generate a line through my
>> data.
>> However my model always predicts 1 for all the feature examples. Here is
>> my
>> code:
>>
>> print data_rdd.take(5)
>> [LabeledPoint(1.0, [1.9643,4.5957]), LabeledPoint(1.0, [2.2753,3.8589]),
>> LabeledPoint(1.0, [2.9781,4.5651]), LabeledPoint(1.0, [2.932,3.5519]),
>> LabeledPoint(1.0, [3.5772,2.856])]
>>
>> 
>> 
>> from pyspark.mllib.classification import SVMWithSGD
>> from pyspark.mllib.linalg import Vectors
>> from sklearn.svm import SVC
>> data_rdd=x_df.map(lambda x:LabeledPoint(x[1],x[0]))
>>
>> model = SVMWithSGD.train(data_rdd, iterations=1000,regParam=1)
>>
>> X=x_df.map(lambda x:x[0]).collect()
>> Y=x_df.map(lambda x:x[1]).collect()
>>
>> 
>> 
>> pred=[]
>> for i in X:
>>   pred.append(model.predict(i))
>> print pred
>>
>> [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
>> 1,
>> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
>> 1]
>>
>>
>> My dataset is as follows:
>> > 7955/Screen_Shot_2016-10-25_at_2.png>
>>
>>
>> Can someone please help?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Need-help-with-SVM-tp27955.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


What syntax can be used to specify the latest version of JAR found while using spark submit

2016-10-26 Thread Aseem Bansal
Hi

Can someone please share their thoughts on
http://stackoverflow.com/questions/40259022/what-syntax-can-be-used-to-specify-the-latest-version-of-jar-found-while-using-s


Is there length limit for sparksql/hivesql?

2016-10-26 Thread Jone Zhang
Is there length limit for sparksql/hivesql?
Can antlr work well if sql is too long?

Thanks.


Can application JAR name contain + for dependency resolution to latest version?

2016-10-26 Thread Aseem Bansal
Hi

While using spark-submit
 to
submit spark jobs is the exact name of the JAR file necessary? Or is there
a way to use something like `1.0.+` to denote the latest version found?


Re: Need help with SVM

2016-10-26 Thread Robin East
As per Assem’s point what do you get from 
data_rdd.toDF.groupBy("label").count.show




> On 25 Oct 2016, at 15:41, Aseem Bansal  wrote:
> 
> Is there any labeled point with label 0 in your dataset? 
> 
> On Tue, Oct 25, 2016 at 2:13 AM, aditya1702  > wrote:
> Hello,
> I am using linear SVM to train my model and generate a line through my data.
> However my model always predicts 1 for all the feature examples. Here is my
> code:
> 
> print data_rdd.take(5)
> [LabeledPoint(1.0, [1.9643,4.5957]), LabeledPoint(1.0, [2.2753,3.8589]),
> LabeledPoint(1.0, [2.9781,4.5651]), LabeledPoint(1.0, [2.932,3.5519]),
> LabeledPoint(1.0, [3.5772,2.856])]
> 
> 
> from pyspark.mllib.classification import SVMWithSGD
> from pyspark.mllib.linalg import Vectors
> from sklearn.svm import SVC
> data_rdd=x_df.map(lambda x:LabeledPoint(x[1],x[0]))
> 
> model = SVMWithSGD.train(data_rdd, iterations=1000,regParam=1)
> 
> X=x_df.map(lambda x:x[0]).collect()
> Y=x_df.map(lambda x:x[1]).collect()
> 
> 
> pred=[]
> for i in X:
>   pred.append(model.predict(i))
> print pred
> 
> [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
> 1]
> 
> 
> My dataset is as follows:
>   
> >
> 
> 
> Can someone please help?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Need-help-with-SVM-tp27955.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 



Re: HiveContext is Serialized?

2016-10-26 Thread Mich Talebzadeh
Thanks Sean.

I believe you are referring to below statement

"You can't use the HiveContext or SparkContext in a distribution operation.
It has nothing to do with for loops.

The fact that they're serializable is misleading. It's there, I believe,
because these objects may be inadvertently referenced in the closure of a
function that executes remotely, yet doesn't use the context. The closure
cleaner can't always remove this reference. The task would fail to
serialize even though it doesn't use the context. You will find these
objects serialize but then don't work if used remotely."



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 26 October 2016 at 09:27, Sean Owen  wrote:

> Yes, but the question here is why the context objects are marked
> serializable when they are not meant to be sent somewhere as bytes. I tried
> to answer that apparent inconsistency below.
>
>
> On Wed, Oct 26, 2016, 10:21 Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> Sorry for asking this rather naïve question.
>>
>> The notion of serialisation in Spark and where it can be serialised or
>> not. Does this generally refer to the concept of serialisation in the
>> context of data storage?
>>
>> In this context for example with reference to RDD operations is
>> it process of translating object state into a format that can be stored
>> and retrieved from memory buffer?
>>
>> Thanks
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 26 October 2016 at 09:06, Sean Owen  wrote:
>>
>> It is the driver that has the info needed to schedule and manage
>> distributed jobs and that is by design.
>>
>> This is narrowly about using the HiveContext or SparkContext directly. Of
>> course SQL operations are distributed.
>>
>>
>> On Wed, Oct 26, 2016, 10:03 Mich Talebzadeh 
>> wrote:
>>
>> Hi Sean,
>>
>> Your point:
>>
>> "You can't use the HiveContext or SparkContext in a distribution
>> operation..."
>>
>> Is this because of design issue?
>>
>> Case in point if I created a DF from RDD and register it as a tempTable,
>> does this imply that any sql calls on that table islocalised and not
>> distributed among executors?
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 26 October 2016 at 06:43, Ajay Chander  wrote:
>>
>> Sean, thank you for making it clear. It was helpful.
>>
>> Regards,
>> Ajay
>>
>>
>> On Wednesday, October 26, 2016, Sean Owen  wrote:
>>
>> This usage is fine, because you are only using the HiveContext locally on
>> the driver. It's applied in a function that's used on a Scala collection.
>>
>> You can't use the HiveContext or SparkContext in a distribution
>> operation. It has nothing to do with for loops.
>>
>> The fact that they're serializable is misleading. It's there, I believe,
>> because these objects may be inadvertently referenced in the closure of a
>> function that executes remotely, yet doesn't use the context. The closure
>> cleaner can't always remove this reference. The task would fail to
>> serialize even though it doesn't use the context. You will find these
>> objects serialize but then don't work if used remotely.
>>
>> The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
>> 

Any Dynamic Compilation of Scala Query

2016-10-26 Thread Mahender Sarangam
Hi,

Is there any way to dynamically execute a string  which has scala code 
against spark engine. We are dynamically creating scala file, we would 
like to submit this scala file to Spark, but currently spark accepts 
only JAR file has input from Remote Job submission. Is there any other 
way to submit .SCALA instead of .JAR to REST API of Spark ?

/MS



Re: HiveContext is Serialized?

2016-10-26 Thread Sean Owen
Yes, but the question here is why the context objects are marked
serializable when they are not meant to be sent somewhere as bytes. I tried
to answer that apparent inconsistency below.

On Wed, Oct 26, 2016, 10:21 Mich Talebzadeh 
wrote:

> Hi,
>
> Sorry for asking this rather naïve question.
>
> The notion of serialisation in Spark and where it can be serialised or
> not. Does this generally refer to the concept of serialisation in the
> context of data storage?
>
> In this context for example with reference to RDD operations is it process
> of translating object state into a format that can be stored and
> retrieved from memory buffer?
>
> Thanks
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 26 October 2016 at 09:06, Sean Owen  wrote:
>
> It is the driver that has the info needed to schedule and manage
> distributed jobs and that is by design.
>
> This is narrowly about using the HiveContext or SparkContext directly. Of
> course SQL operations are distributed.
>
>
> On Wed, Oct 26, 2016, 10:03 Mich Talebzadeh 
> wrote:
>
> Hi Sean,
>
> Your point:
>
> "You can't use the HiveContext or SparkContext in a distribution
> operation..."
>
> Is this because of design issue?
>
> Case in point if I created a DF from RDD and register it as a tempTable,
> does this imply that any sql calls on that table islocalised and not
> distributed among executors?
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 26 October 2016 at 06:43, Ajay Chander  wrote:
>
> Sean, thank you for making it clear. It was helpful.
>
> Regards,
> Ajay
>
>
> On Wednesday, October 26, 2016, Sean Owen  wrote:
>
> This usage is fine, because you are only using the HiveContext locally on
> the driver. It's applied in a function that's used on a Scala collection.
>
> You can't use the HiveContext or SparkContext in a distribution operation.
> It has nothing to do with for loops.
>
> The fact that they're serializable is misleading. It's there, I believe,
> because these objects may be inadvertently referenced in the closure of a
> function that executes remotely, yet doesn't use the context. The closure
> cleaner can't always remove this reference. The task would fail to
> serialize even though it doesn't use the context. You will find these
> objects serialize but then don't work if used remotely.
>
> The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
> IIRC.
>
> On Wed, Oct 26, 2016 at 4:28 AM Ajay Chander  wrote:
>
> Hi Everyone,
>
> I was thinking if I can use hiveContext inside foreach like below,
>
> object Test {
>   def main(args: Array[String]): Unit = {
>
> val conf = new SparkConf()
> val sc = new SparkContext(conf)
> val hiveContext = new HiveContext(sc)
>
> val dataElementsFile = args(0)
> val deDF = 
> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>
> def calculate(de: Row) {
>   val dataElement = de.getAs[String]("DataElement").trim
>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
> TEST_DB.TEST_TABLE1 ")
>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
> }
>
> deDF.collect().foreach(calculate)
>   }
> }
>
>
> I looked at 
> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>  and I see it is extending SqlContext which extends Logging with Serializable.
>
> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>
> Regards,
>
> Ajay
>
>
>
>


Re: HiveContext is Serialized?

2016-10-26 Thread Mich Talebzadeh
Hi,

Sorry for asking this rather naïve question.

The notion of serialisation in Spark and where it can be serialised or not.
Does this generally refer to the concept of serialisation in the context of
data storage?

In this context for example with reference to RDD operations is it process
of translating object state into a format that can be stored and retrieved
from memory buffer?

Thanks




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 26 October 2016 at 09:06, Sean Owen  wrote:

> It is the driver that has the info needed to schedule and manage
> distributed jobs and that is by design.
>
> This is narrowly about using the HiveContext or SparkContext directly. Of
> course SQL operations are distributed.
>
>
> On Wed, Oct 26, 2016, 10:03 Mich Talebzadeh 
> wrote:
>
>> Hi Sean,
>>
>> Your point:
>>
>> "You can't use the HiveContext or SparkContext in a distribution
>> operation..."
>>
>> Is this because of design issue?
>>
>> Case in point if I created a DF from RDD and register it as a tempTable,
>> does this imply that any sql calls on that table islocalised and not
>> distributed among executors?
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 26 October 2016 at 06:43, Ajay Chander  wrote:
>>
>> Sean, thank you for making it clear. It was helpful.
>>
>> Regards,
>> Ajay
>>
>>
>> On Wednesday, October 26, 2016, Sean Owen  wrote:
>>
>> This usage is fine, because you are only using the HiveContext locally on
>> the driver. It's applied in a function that's used on a Scala collection.
>>
>> You can't use the HiveContext or SparkContext in a distribution
>> operation. It has nothing to do with for loops.
>>
>> The fact that they're serializable is misleading. It's there, I believe,
>> because these objects may be inadvertently referenced in the closure of a
>> function that executes remotely, yet doesn't use the context. The closure
>> cleaner can't always remove this reference. The task would fail to
>> serialize even though it doesn't use the context. You will find these
>> objects serialize but then don't work if used remotely.
>>
>> The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
>> IIRC.
>>
>> On Wed, Oct 26, 2016 at 4:28 AM Ajay Chander  wrote:
>>
>> Hi Everyone,
>>
>> I was thinking if I can use hiveContext inside foreach like below,
>>
>> object Test {
>>   def main(args: Array[String]): Unit = {
>>
>> val conf = new SparkConf()
>> val sc = new SparkContext(conf)
>> val hiveContext = new HiveContext(sc)
>>
>> val dataElementsFile = args(0)
>> val deDF = 
>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>
>> def calculate(de: Row) {
>>   val dataElement = de.getAs[String]("DataElement").trim
>>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>> TEST_DB.TEST_TABLE1 ")
>>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
>> }
>>
>> deDF.collect().foreach(calculate)
>>   }
>> }
>>
>>
>> I looked at 
>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>  and I see it is extending SqlContext which extends Logging with 
>> Serializable.
>>
>> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>>
>> Regards,
>>
>> Ajay
>>
>>
>>


Re: HiveContext is Serialized?

2016-10-26 Thread Sean Owen
It is the driver that has the info needed to schedule and manage
distributed jobs and that is by design.

This is narrowly about using the HiveContext or SparkContext directly. Of
course SQL operations are distributed.

On Wed, Oct 26, 2016, 10:03 Mich Talebzadeh 
wrote:

> Hi Sean,
>
> Your point:
>
> "You can't use the HiveContext or SparkContext in a distribution
> operation..."
>
> Is this because of design issue?
>
> Case in point if I created a DF from RDD and register it as a tempTable,
> does this imply that any sql calls on that table islocalised and not
> distributed among executors?
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 26 October 2016 at 06:43, Ajay Chander  wrote:
>
> Sean, thank you for making it clear. It was helpful.
>
> Regards,
> Ajay
>
>
> On Wednesday, October 26, 2016, Sean Owen  wrote:
>
> This usage is fine, because you are only using the HiveContext locally on
> the driver. It's applied in a function that's used on a Scala collection.
>
> You can't use the HiveContext or SparkContext in a distribution operation.
> It has nothing to do with for loops.
>
> The fact that they're serializable is misleading. It's there, I believe,
> because these objects may be inadvertently referenced in the closure of a
> function that executes remotely, yet doesn't use the context. The closure
> cleaner can't always remove this reference. The task would fail to
> serialize even though it doesn't use the context. You will find these
> objects serialize but then don't work if used remotely.
>
> The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
> IIRC.
>
> On Wed, Oct 26, 2016 at 4:28 AM Ajay Chander  wrote:
>
> Hi Everyone,
>
> I was thinking if I can use hiveContext inside foreach like below,
>
> object Test {
>   def main(args: Array[String]): Unit = {
>
> val conf = new SparkConf()
> val sc = new SparkContext(conf)
> val hiveContext = new HiveContext(sc)
>
> val dataElementsFile = args(0)
> val deDF = 
> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>
> def calculate(de: Row) {
>   val dataElement = de.getAs[String]("DataElement").trim
>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
> TEST_DB.TEST_TABLE1 ")
>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
> }
>
> deDF.collect().foreach(calculate)
>   }
> }
>
>
> I looked at 
> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>  and I see it is extending SqlContext which extends Logging with Serializable.
>
> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>
> Regards,
>
> Ajay
>
>
>


Re: HiveContext is Serialized?

2016-10-26 Thread ayan guha
In your use case, your dedf need not to be a data frame. You could use
SC.textFile().collect.
Even better you can just read off a local file, as your file is very small,
unless you are planning to use yarn cluster mode.
On 26 Oct 2016 16:43, "Ajay Chander"  wrote:

> Sean, thank you for making it clear. It was helpful.
>
> Regards,
> Ajay
>
> On Wednesday, October 26, 2016, Sean Owen  wrote:
>
>> This usage is fine, because you are only using the HiveContext locally on
>> the driver. It's applied in a function that's used on a Scala collection.
>>
>> You can't use the HiveContext or SparkContext in a distribution
>> operation. It has nothing to do with for loops.
>>
>> The fact that they're serializable is misleading. It's there, I believe,
>> because these objects may be inadvertently referenced in the closure of a
>> function that executes remotely, yet doesn't use the context. The closure
>> cleaner can't always remove this reference. The task would fail to
>> serialize even though it doesn't use the context. You will find these
>> objects serialize but then don't work if used remotely.
>>
>> The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
>> IIRC.
>>
>> On Wed, Oct 26, 2016 at 4:28 AM Ajay Chander  wrote:
>>
>>> Hi Everyone,
>>>
>>> I was thinking if I can use hiveContext inside foreach like below,
>>>
>>> object Test {
>>>   def main(args: Array[String]): Unit = {
>>>
>>> val conf = new SparkConf()
>>> val sc = new SparkContext(conf)
>>> val hiveContext = new HiveContext(sc)
>>>
>>> val dataElementsFile = args(0)
>>> val deDF = 
>>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>>
>>> def calculate(de: Row) {
>>>   val dataElement = de.getAs[String]("DataElement").trim
>>>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>>> TEST_DB.TEST_TABLE1 ")
>>>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
>>> }
>>>
>>> deDF.collect().foreach(calculate)
>>>   }
>>> }
>>>
>>>
>>> I looked at 
>>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>>  and I see it is extending SqlContext which extends Logging with 
>>> Serializable.
>>>
>>> Can anyone tell me if this is the right way to use it ? Thanks for your 
>>> time.
>>>
>>> Regards,
>>>
>>> Ajay
>>>
>>>


Re: HiveContext is Serialized?

2016-10-26 Thread Mich Talebzadeh
Hi Sean,

Your point:

"You can't use the HiveContext or SparkContext in a distribution
operation..."

Is this because of design issue?

Case in point if I created a DF from RDD and register it as a tempTable,
does this imply that any sql calls on that table islocalised and not
distributed among executors?

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 26 October 2016 at 06:43, Ajay Chander  wrote:

> Sean, thank you for making it clear. It was helpful.
>
> Regards,
> Ajay
>
>
> On Wednesday, October 26, 2016, Sean Owen  wrote:
>
>> This usage is fine, because you are only using the HiveContext locally on
>> the driver. It's applied in a function that's used on a Scala collection.
>>
>> You can't use the HiveContext or SparkContext in a distribution
>> operation. It has nothing to do with for loops.
>>
>> The fact that they're serializable is misleading. It's there, I believe,
>> because these objects may be inadvertently referenced in the closure of a
>> function that executes remotely, yet doesn't use the context. The closure
>> cleaner can't always remove this reference. The task would fail to
>> serialize even though it doesn't use the context. You will find these
>> objects serialize but then don't work if used remotely.
>>
>> The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
>> IIRC.
>>
>> On Wed, Oct 26, 2016 at 4:28 AM Ajay Chander  wrote:
>>
>>> Hi Everyone,
>>>
>>> I was thinking if I can use hiveContext inside foreach like below,
>>>
>>> object Test {
>>>   def main(args: Array[String]): Unit = {
>>>
>>> val conf = new SparkConf()
>>> val sc = new SparkContext(conf)
>>> val hiveContext = new HiveContext(sc)
>>>
>>> val dataElementsFile = args(0)
>>> val deDF = 
>>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>>
>>> def calculate(de: Row) {
>>>   val dataElement = de.getAs[String]("DataElement").trim
>>>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>>> TEST_DB.TEST_TABLE1 ")
>>>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
>>> }
>>>
>>> deDF.collect().foreach(calculate)
>>>   }
>>> }
>>>
>>>
>>> I looked at 
>>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>>  and I see it is extending SqlContext which extends Logging with 
>>> Serializable.
>>>
>>> Can anyone tell me if this is the right way to use it ? Thanks for your 
>>> time.
>>>
>>> Regards,
>>>
>>> Ajay
>>>
>>>