Challenges with Datasource V2 API

2019-06-25 Thread Sunita Arvind
Hello Spark Experts,

I am having challenges using the DataSource V2 API. I created a mock

The input partitions seem to be created correctly. The below output
confirms that:

19/06/23 16:00:21 INFO root: createInputPartitions
19/06/23 16:00:21 INFO root: Create a partition for abc

The InputPartitionReader seems to have fetched the data right as well,
however, it seems to keep going infinitely between the next() and get()
operations of the InputPartitionReader while on the cluster.

I tried to mock this and here is the code for the mockup - *
https://github.com/skopp002/SparkDatasourceV2.git *

However, the issue does not surface in the mock project. One concern that
does seem to show up is the duplication of records that I had noticed once
in production as well. There is only one record with usage value of
"1.2006451E7" in mockdata.json. But there are multiple records in the load
result. Could this be having the effect of infinite data in production? In
production, even for a few KBs I hit the error below.
```2019-06-23 16:07:29 INFO UnsafeExternalSorter:209 - Thread 47 spilling
sort data of 1984.0 MB to disk (50 times so far)
2019-06-23 16:07:31 INFO UnsafeExternalSorter:209 - Thread 47 spilling sort
data of 1984.0 MB to disk (51 times so far)
2019-06-23 16:07:33 INFO UnsafeExternalSorter:209 - Thread 47 spilling sort
data of 1984.0 MB to disk (52 times so far)```

But could not reproduce the exact error here in the mock project. Probably
the data is too small to surface the problem.
Can someone review the code and tell me if I am doing something wrong?

regards
Sunita


Re: a way to allow spark job to continue despite task failures?

2018-01-24 Thread Sunita Arvind
Had a similar situation and landed on this question.
Finally I was able to make it do what I needed by cheating the spark driver
:)
i.e By setting a very high value for "--conf spark.task.maxFailures=800".
I made it 800 deliberately which typically is 4. So by the time 800
attempts for failed tasks were done, other tasks completed.
You can set it to higher or lower value depending on how many more tasks
you have and the duration they take to complete.

regards
Sunita

On Fri, Nov 13, 2015 at 4:50 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> I searched the code base and looked at:
> https://spark.apache.org/docs/latest/running-on-yarn.html
>
> I didn't find mapred.max.map.failures.percent or its counterpart.
>
> FYI
>
> On Fri, Nov 13, 2015 at 9:05 AM, Nicolae Marasoiu <
> nicolae.maras...@adswizz.com> wrote:
>
>> Hi,
>>
>>
>> I know a task can fail 2 times and only the 3rd breaks the entire job.
>>
>> I am good with this number of attempts.
>>
>> I would like that after trying a task 3 times, it continues with the
>> other tasks.
>>
>> The job can be "failed", but I want all tasks run.
>>
>> Please see my use case.
>>
>>
>> I read a hadoop input set, and some gzip files are incomplete. I would
>> like to just skip them and the only way I see is to tell Spark to ignore
>> some tasks permanently failing, if it is possible. With traditional hadoop
>> map-reduce this was possible using mapred.max.map.failures.percent.
>>
>>
>> Do map-reduce params like mapred.max.map.failures.percent apply to
>> Spark/YARN map-reduce jobs ?
>>
>> I edited $HADOOP_CONF_DIR/mapred-site.xml and
>> added mapred.max.map.failures.percent=30 but does not seem to apply, job
>> still failed after 3 task attempt fails.
>>
>>
>> Should Spark transmit this parameter? Or the mapred.* do not apply?
>>
>> Do other hadoop parameters (e.g. the ones involved in the input reading,
>> not in the "processing" or "application" like this max.map.failures) - are
>> others taken into account and transmitted? I saw that it should scan
>> HADOOP_CONF_DIR and forward those, but I guess this does not apply to any
>> parameter, since Spark has its own distribution & DAG stages processing
>> logic, which just happens to have a YARN implementation.
>>
>>
>> Do you know a way to do this in Spark - to ignore a predefined number of
>> tasks fail, but allow the job to continue? This way I could see all the
>> faulty input files in one job run, delete them all and continue with the
>> rest.
>>
>>
>> Just to mention, doing a manual gzip -t on top of hadoop cat is
>> infeasible and map-reduce is way faster to scan the 15K files worth 70GB
>> (its doing 25M/s per node), while the old style hadoop cat is doing much
>> less.
>>
>>
>> Thanks,
>>
>> Nicu
>>
>
>


Re: Chaining Spark Streaming Jobs

2017-11-02 Thread Sunita Arvind
Sorry Michael, I ended up using kafka and missed noticing your message.
Yes, I did specify the schema with read.schema and thats when I got:

at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)

regards

Sunita


On Mon, Sep 18, 2017 at 10:15 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> You specify the schema when loading a dataframe by calling
> spark.read.schema(...)...
>
> On Tue, Sep 12, 2017 at 4:50 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>
>> Hi Michael,
>>
>> I am wondering what I am doing wrong. I get error like:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Schema
>> must be specified when creating a streaming source DataFrame. If some files
>> already exist in the directory, then depending on the file format you may
>> be able to create a static DataFrame on that directory with
>> 'spark.read.load(directory)' and infer schema from it.
>> at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceSchema(DataSource.scala:223)
>> at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceInfo$lzycompute(DataSource.scala:87)
>> at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceInfo(DataSource.scala:87)
>> at org.apache.spark.sql.execution.streaming.StreamingRelation$.
>> apply(StreamingRelation.scala:30)
>> at org.apache.spark.sql.streaming.DataStreamReader.load(
>> DataStreamReader.scala:125)
>> at org.apache.spark.sql.streaming.DataStreamReader.load(
>> DataStreamReader.scala:134)
>> at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregate
>> s.scala:23)
>> at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
>> at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.
>> java:144)
>> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook
>>
>>
>> I tried specifying the schema as well.
>> Here is my code:
>>
>> object Aggregates {
>>
>>   val aggregation=
>> """select sum(col1), sum(col2), id, first(name)
>>   from enrichedtb
>>   group by id
>> """.stripMargin
>>
>>   def aggregator(conf:Config)={
>> implicit val spark = 
>> SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
>> implicit val sqlctx = spark.sqlContext
>> printf("Source path is" + conf.getString("source.path"))
>> val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // 
>> Added this as it was complaining about schema.
>> val df=spark.readStream.format("parquet").option("inferSchema", 
>> true).schema(schemadf.schema).load(conf.getString("source.path"))
>> df.createOrReplaceTempView("enrichedtb")
>> val res = spark.sql(aggregation)
>> 
>> res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
>>   }
>>
>>   def main(args: Array[String]): Unit = {
>> val mainconf = ConfigFactory.load()
>> val conf = mainconf.getConfig(mainconf.getString("pipeline"))
>> print(conf.toString)
>> aggregator(conf)
>>   }
>>
>> }
>>
>>
>> I tried to extract schema from static read of the input path and provided it 
>> to the readStream API. With that, I get this error:
>>
>> at 
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>>  at 
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
>>  at 
>> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>>  at 
>> org.apache.spark.sql.streaming.StreamingQueryManager

Change the owner of hdfs file being saved

2017-11-02 Thread Sunita Arvind
Hello Experts,

I am required to use a specific user id to save files on a remote hdfs
cluster. Remote in the sense, spark jobs run on EMR and write to a CDH
cluster. Hence I cannot change the hdfs-site.xml etc to point to the
destination cluster. As a result I am using webhdfs to save the files into
it.

There are few challenges I have with this approach
1. I cannot use nameservice of the namenode and have to specify the IP
address of the active namenode, which is risky when there is a failover

2. I cannot change the owner/group of the file being written by spark. I
see no option to provide owner for files being written (
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
)

3. Using jdbc such that I can specify the user name and password would mean
I will end up creating managed tables only. This is not acceptable for our
usecase.

Is there a way to change the owner of files written by Spark?

regards
Sunita


Re: Chaining Spark Streaming Jobs

2017-09-13 Thread Sunita Arvind
Thanks for your suggestion Vincent. Do not have much experience with akka
as such. I will explore this option.

On Tue, Sep 12, 2017 at 11:01 PM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> What about chaining with akka or akka stream and the fair scheduler ?
>
> Le 13 sept. 2017 01:51, "Sunita Arvind" <sunitarv...@gmail.com> a écrit :
>
> Hi Michael,
>
> I am wondering what I am doing wrong. I get error like:
>
> Exception in thread "main" java.lang.IllegalArgumentException: Schema
> must be specified when creating a streaming source DataFrame. If some files
> already exist in the directory, then depending on the file format you may
> be able to create a static DataFrame on that directory with
> 'spark.read.load(directory)' and infer schema from it.
> at org.apache.spark.sql.execution.datasources.DataSource.
> sourceSchema(DataSource.scala:223)
> at org.apache.spark.sql.execution.datasources.DataSource.
> sourceInfo$lzycompute(DataSource.scala:87)
> at org.apache.spark.sql.execution.datasources.DataSource.
> sourceInfo(DataSource.scala:87)
> at org.apache.spark.sql.execution.streaming.StreamingRelation$.
> apply(StreamingRelation.scala:30)
> at org.apache.spark.sql.streaming.DataStreamReader.load(
> DataStreamReader.scala:125)
> at org.apache.spark.sql.streaming.DataStreamReader.load(
> DataStreamReader.scala:134)
> at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregate
> s.scala:23)
> at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
> at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
> ssorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
> thodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.
> java:144)
> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook
>
>
> I tried specifying the schema as well.
> Here is my code:
>
> object Aggregates {
>
>   val aggregation=
> """select sum(col1), sum(col2), id, first(name)
>   from enrichedtb
>   group by id
> """.stripMargin
>
>   def aggregator(conf:Config)={
> implicit val spark = 
> SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
> implicit val sqlctx = spark.sqlContext
> printf("Source path is" + conf.getString("source.path"))
> val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // 
> Added this as it was complaining about schema.
> val df=spark.readStream.format("parquet").option("inferSchema", 
> true).schema(schemadf.schema).load(conf.getString("source.path"))
> df.createOrReplaceTempView("enrichedtb")
> val res = spark.sql(aggregation)
> 
> res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
>   }
>
>   def main(args: Array[String]): Unit = {
> val mainconf = ConfigFactory.load()
> val conf = mainconf.getConfig(mainconf.getString("pipeline"))
> print(conf.toString)
> aggregator(conf)
>   }
>
> }
>
>
> I tried to extract schema from static read of the input path and provided it 
> to the readStream API. With that, I get this error:
>
> at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>   at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)
>
> While running on the EMR cluster all paths point to S3. In my laptop, they 
> all point to local filesystem.
>
> I am using Spark2.2.0
>
> Appreciate your help.
>
> regards
>
> Sunita
>
>
> On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>

Re: Chaining Spark Streaming Jobs

2017-09-12 Thread Sunita Arvind
Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must
be specified when creating a streaming source DataFrame. If some files
already exist in the directory, then depending on the file format you may
be able to create a static DataFrame on that directory with
'spark.read.load(directory)' and infer schema from it.
at
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:223)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:125)
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:134)
at
com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregates.scala:23)
at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook


I tried specifying the schema as well.
Here is my code:

object Aggregates {

  val aggregation=
"""select sum(col1), sum(col2), id, first(name)
  from enrichedtb
  group by id
""".stripMargin

  def aggregator(conf:Config)={
implicit val spark =
SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
implicit val sqlctx = spark.sqlContext
printf("Source path is" + conf.getString("source.path"))
val schemadf = sqlctx.read.parquet(conf.getString("source.path"))
// Added this as it was complaining about schema.
val df=spark.readStream.format("parquet").option("inferSchema",
true).schema(schemadf.schema).load(conf.getString("source.path"))
df.createOrReplaceTempView("enrichedtb")
val res = spark.sql(aggregation)

res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
  }

  def main(args: Array[String]): Unit = {
val mainconf = ConfigFactory.load()
val conf = mainconf.getConfig(mainconf.getString("pipeline"))
print(conf.toString)
aggregator(conf)
  }

}


I tried to extract schema from static read of the input path and
provided it to the readStream API. With that, I get this error:

at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop,
they all point to local filesystem.

I am using Spark2.2.0

Appreciate your help.

regards

Sunita


On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> If you use structured streaming and the file sink, you can have a
> subsequent stream read using the file source.  This will maintain exactly
> once processing even if there are hiccups or failures.
>
> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>
>> Hello Spark Experts,
>>
>> I have a design question w.r.t Spark Streaming. I have a streaming job
>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>> on premise. My spark application runs on EMR (aws) and persists data onto
>> s3. Before I persist, I need to strip header and convert protobuffer to
>> parquet (I use sparksql-scalapb to convert from Protobuff to
>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>> enrichment on the same dataframe after persisting the ra

Re: Chaining Spark Streaming Jobs

2017-09-08 Thread Sunita Arvind
Thanks for your response Praneeth. We did consider Kafka however cost was
the only hold back factor as we might need a larger cluster and existing
cluster is on premise and my app is on cloud. So the same cluster cannot be
used.
But I agree it does sound like a good alternative.

Regards
Sunita

On Thu, Sep 7, 2017 at 11:24 PM Praneeth Gayam <praneeth.ga...@gmail.com>
wrote:

> With file stream you will have to deal with the following
>
>1. The file(s) must not be changed once created. So if the files are
>being continuously appended, the new data will not be read. Refer
>
> <https://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources>
>2. The files must be created in the dataDirectory by atomically
>*moving* or *renaming* them into the data directory.
>
> Since the latency requirements for the second job in the chain is only a
> few mins, you may have to end up creating a new file every few mins
>
> You may want to consider Kafka as your intermediary store for building a
> chain/DAG of streaming jobs
>
> On Fri, Sep 8, 2017 at 9:45 AM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>
>> Thanks for your response Michael
>> Will try it out.
>>
>> Regards
>> Sunita
>>
>> On Wed, Aug 23, 2017 at 2:30 PM Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> If you use structured streaming and the file sink, you can have a
>>> subsequent stream read using the file source.  This will maintain exactly
>>> once processing even if there are hiccups or failures.
>>>
>>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <sunitarv...@gmail.com>
>>> wrote:
>>>
>>>> Hello Spark Experts,
>>>>
>>>> I have a design question w.r.t Spark Streaming. I have a streaming job
>>>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>>>> on premise. My spark application runs on EMR (aws) and persists data onto
>>>> s3. Before I persist, I need to strip header and convert protobuffer to
>>>> parquet (I use sparksql-scalapb to convert from Protobuff to
>>>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>>>> enrichment on the same dataframe after persisting the raw data, however, in
>>>> order to modularize I am planning to have a separate job which picks up the
>>>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>>>> 1 job as the enrichments could get project specific while raw data
>>>> persistence stays customer/project agnostic.The enriched data is allowed to
>>>> have some latency (few minutes)
>>>>
>>>> My challenge is, after persisting the raw data, how do I chain the next
>>>> streaming job. The only way I can think of is -  job 1 (raw data)
>>>> partitions on current date (MMDD) and within current date, the job 2
>>>> (enrichment job) filters for records within 60s of current time and
>>>> performs enrichment on it in 60s batches.
>>>> Is this a good option? It seems to be error prone. When either of the
>>>> jobs get delayed due to bursts or any error/exception this could lead to
>>>> huge data losses and non-deterministic behavior . What are other
>>>> alternatives to this?
>>>>
>>>> Appreciate any guidance in this regard.
>>>>
>>>> regards
>>>> Sunita Koppar
>>>>
>>>
>>>
>


Re: Chaining Spark Streaming Jobs

2017-09-07 Thread Sunita Arvind
Thanks for your response Michael
Will try it out.

Regards
Sunita

On Wed, Aug 23, 2017 at 2:30 PM Michael Armbrust <mich...@databricks.com>
wrote:

> If you use structured streaming and the file sink, you can have a
> subsequent stream read using the file source.  This will maintain exactly
> once processing even if there are hiccups or failures.
>
> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>
>> Hello Spark Experts,
>>
>> I have a design question w.r.t Spark Streaming. I have a streaming job
>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>> on premise. My spark application runs on EMR (aws) and persists data onto
>> s3. Before I persist, I need to strip header and convert protobuffer to
>> parquet (I use sparksql-scalapb to convert from Protobuff to
>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>> enrichment on the same dataframe after persisting the raw data, however, in
>> order to modularize I am planning to have a separate job which picks up the
>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>> 1 job as the enrichments could get project specific while raw data
>> persistence stays customer/project agnostic.The enriched data is allowed to
>> have some latency (few minutes)
>>
>> My challenge is, after persisting the raw data, how do I chain the next
>> streaming job. The only way I can think of is -  job 1 (raw data)
>> partitions on current date (MMDD) and within current date, the job 2
>> (enrichment job) filters for records within 60s of current time and
>> performs enrichment on it in 60s batches.
>> Is this a good option? It seems to be error prone. When either of the
>> jobs get delayed due to bursts or any error/exception this could lead to
>> huge data losses and non-deterministic behavior . What are other
>> alternatives to this?
>>
>> Appreciate any guidance in this regard.
>>
>> regards
>> Sunita Koppar
>>
>
>


Chaining Spark Streaming Jobs

2017-08-21 Thread Sunita Arvind
Hello Spark Experts,

I have a design question w.r.t Spark Streaming. I have a streaming job that
consumes protocol buffer encoded real time logs from a Kafka cluster on
premise. My spark application runs on EMR (aws) and persists data onto s3.
Before I persist, I need to strip header and convert protobuffer to parquet
(I use sparksql-scalapb to convert from Protobuff to Spark.sql.Row). I need
to persist Raw logs as is. I can continue the enrichment on the same
dataframe after persisting the raw data, however, in order to modularize I
am planning to have a separate job which picks up the raw data and performs
enrichment on it. Also,  I am trying to avoid all in 1 job as the
enrichments could get project specific while raw data persistence stays
customer/project agnostic.The enriched data is allowed to have some latency
(few minutes)

My challenge is, after persisting the raw data, how do I chain the next
streaming job. The only way I can think of is -  job 1 (raw data)
partitions on current date (MMDD) and within current date, the job 2
(enrichment job) filters for records within 60s of current time and
performs enrichment on it in 60s batches.
Is this a good option? It seems to be error prone. When either of the jobs
get delayed due to bursts or any error/exception this could lead to huge
data losses and non-deterministic behavior . What are other alternatives to
this?

Appreciate any guidance in this regard.

regards
Sunita Koppar


Writing Parquet from Avro objects - cannot write null value for numeric fields

2017-01-05 Thread Sunita Arvind
Hello Experts,

I am trying to allow null values in numeric fields. Here are the details of
the issue I have:
http://stackoverflow.com/questions/41492344/spark-avro-to-parquet-writing-null-values-in-number-fields

I also tried making all columns nullable by using the below function (from
one of the suggestions on web)

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) :
DataFrame = {
  df.sqlContext.createDataFrame(df.rdd,
StructType(df.schema.map(_.copy(nullable = nullable
}

The printSchema shows that the columns are now nullable, but still I
cannot persist

as parquet with null in the numeric fields.

Is there a workaround to it? I need to be able to allow null values
for numeric fields

Thanks in advance.

regards

Sunita


Re: Zero Data Loss in Spark with Kafka

2016-10-26 Thread Sunita Arvind
This is enough to get it to work:

df.save(conf.getString("ParquetOutputPath")+offsetSaved, "parquet",
SaveMode.Overwrite)

And tests so far (in local env) seem good with the edits. Yet to test
on the cluster. Cody, appreciate your thoughts on the edits.

Just want to make sure I am not doing an overkill or overseeing a
potential issue.

regards

Sunita


On Tue, Oct 25, 2016 at 2:38 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> The error in the file I just shared is here:
>
> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" + 
> partition._2(0);  --> this was just partition and hence there was an error
>
> fetching the offset.
>
> Still testing. Somehow Cody, your code never lead to file already exists sort 
> of errors (I am saving the output of the dstream
> as parquet file, after converting it to a dataframe. The batch interval will 
> be 2 hrs)
>
> The code in the main is here:
>
>   val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> conf.getString("groupId"), conf.getString("topics"))
>val storedOffsets = offsetsStore.readOffsets()
>  LogHandler.log.info("Fetched the offset from zookeeper")
>
>  val kafkaArr =  storedOffsets match {
>case None =>
>  // start from the initial offsets
>  
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaProps, Set(topics))
>
>case Some(fromOffsets) =>
>  // start from previously saved offsets
>  val messageHandler: MessageAndMetadata[String, Array[Byte]] => (String, 
> Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) => (mmd.key, 
> mmd.message)
>  
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
>  Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)
>
>  //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage, 
> (String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
>  }
>
>  kafkaArr.foreachRDD{ (rdd,time) =>
>
> val schema = 
> SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
> val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r => 
> Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
> }
> val df = sql.createDataFrame(ardd,schema)
>LogHandler.log.info("Created dataframe")
>val offsetSaved =  
> offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
>LogHandler.log.info("Saved offset to Zookeeper")
>df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
>LogHandler.log.info("Created the parquet file")
>  }
>
> Thanks
>
> Sunita
>
>
>
>
>
> On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>
>> Attached is the edited code. Am I heading in right direction? Also, I am
>> missing something due to which, it seems to work well as long as the
>> application is running and the files are created right. But as soon as I
>> restart the application, it goes back to fromOffset as 0. Any thoughts?
>>
>> regards
>> Sunita
>>
>> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com>
>> wrote:
>>
>>> Thanks for confirming Cody.
>>> To get to use the library, I had to do:
>>>
>>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
>>> "/consumers/topics/"+ topics + "/0")
>>>
>>> It worked well. However, I had to specify the partitionId in the
>>> zkPath.  If I want the library to pick all the partitions for a topic,
>>> without me specifying the path, is it possible out of the box or I need to
>>> tweak?
>>>
>>> regards
>>> Sunita
>>>
>>>
>>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> You are correct that you shouldn't have to worry about broker id.
>>>>
>>>> I'm honestly not sure specifically what else you are asking at this
>>>> point.
>>>>
>>>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com>
>>>> wrote:
>>>> > Just re-read the kafka architecture. Something that slipped my mind
>>>> is, it
>>>> > is leader based. So topic/partitionId pair will be same on all the
>>>> brokers.
>>

Re: HiveContext is Serialized?

2016-10-25 Thread Sunita Arvind
Thanks for the response Sean. I have seen the NPE on similar issues very
consistently and assumed that could be the reason :) Thanks for clarifying.
regards
Sunita

On Tue, Oct 25, 2016 at 10:11 PM, Sean Owen <so...@cloudera.com> wrote:

> This usage is fine, because you are only using the HiveContext locally on
> the driver. It's applied in a function that's used on a Scala collection.
>
> You can't use the HiveContext or SparkContext in a distribution operation.
> It has nothing to do with for loops.
>
> The fact that they're serializable is misleading. It's there, I believe,
> because these objects may be inadvertently referenced in the closure of a
> function that executes remotely, yet doesn't use the context. The closure
> cleaner can't always remove this reference. The task would fail to
> serialize even though it doesn't use the context. You will find these
> objects serialize but then don't work if used remotely.
>
> The NPE you see is an unrelated cosmetic problem that was fixed in 2.0.1
> IIRC.
>
>
> On Wed, Oct 26, 2016 at 4:28 AM Ajay Chander <itsche...@gmail.com> wrote:
>
>> Hi Everyone,
>>
>> I was thinking if I can use hiveContext inside foreach like below,
>>
>> object Test {
>>   def main(args: Array[String]): Unit = {
>>
>> val conf = new SparkConf()
>> val sc = new SparkContext(conf)
>> val hiveContext = new HiveContext(sc)
>>
>> val dataElementsFile = args(0)
>> val deDF = 
>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>
>> def calculate(de: Row) {
>>   val dataElement = de.getAs[String]("DataElement").trim
>>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>> TEST_DB.TEST_TABLE1 ")
>>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
>> }
>>
>> deDF.collect().foreach(calculate)
>>   }
>> }
>>
>>
>> I looked at 
>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>  and I see it is extending SqlContext which extends Logging with 
>> Serializable.
>>
>> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>>
>> Regards,
>>
>> Ajay
>>
>>


Re: HiveContext is Serialized?

2016-10-25 Thread Sunita Arvind
Ajay,

Afaik Generally these contexts cannot be accessed within loops. The sql
query itself would run on distributed datasets so it's a parallel
execution. Putting them in foreach would make it nested in nested. So
serialization would become hard. Not sure I could explain it right.

If you can create the dataframe in main, you can register it as a table and
run the queries in main method itself. You don't need to coalesce or run
the method within foreach.

Regards
Sunita

On Tuesday, October 25, 2016, Ajay Chander <itsche...@gmail.com> wrote:

>
> Jeff, Thanks for your response. I see below error in the logs. You think
> it has to do anything with hiveContext ? Do I have to serialize it before
> using inside foreach ?
>
> 16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener
> threw an exception
> java.lang.NullPointerException
> at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(
> SQLListener.scala:167)
> at org.apache.spark.scheduler.SparkListenerBus$class.onPostEven
> t(SparkListenerBus.scala:42)
> at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL
> istenerBus.scala:31)
> at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL
> istenerBus.scala:31)
> at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBu
> s.scala:55)
> at org.apache.spark.util.AsynchronousListenerBus.postToAll(Asyn
> chronousListenerBus.scala:37)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Asynchro
> nousListenerBus.scala:80)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousLis
> tenerBus.scala:65)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousLis
> tenerBus.scala:65)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$
> anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.sca
> la:1181)
> at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(
> AsynchronousListenerBus.scalnerBus.scala:63)
>
> Thanks,
> Ajay
>
> On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang <zjf...@gmail.com
> <javascript:_e(%7B%7D,'cvml','zjf...@gmail.com');>> wrote:
>
>>
>> In your sample code, you can use hiveContext in the foreach as it is
>> scala List foreach operation which runs in driver side. But you cannot use
>> hiveContext in RDD.foreach
>>
>>
>>
>> Ajay Chander <itsche...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','itsche...@gmail.com');>>于2016年10月26日周三
>> 上午11:28写道:
>>
>>> Hi Everyone,
>>>
>>> I was thinking if I can use hiveContext inside foreach like below,
>>>
>>> object Test {
>>>   def main(args: Array[String]): Unit = {
>>>
>>> val conf = new SparkConf()
>>> val sc = new SparkContext(conf)
>>> val hiveContext = new HiveContext(sc)
>>>
>>> val dataElementsFile = args(0)
>>> val deDF = 
>>> hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>>
>>> def calculate(de: Row) {
>>>   val dataElement = de.getAs[String]("DataElement").trim
>>>   val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + 
>>> dataElement + "' as data_elm, " + dataElement + " as data_elm_val FROM 
>>> TEST_DB.TEST_TABLE1 ")
>>>   df1.write.insertInto("TEST_DB.TEST_TABLE1")
>>> }
>>>
>>> deDF.collect().foreach(calculate)
>>>   }
>>> }
>>>
>>>
>>> I looked at 
>>> https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
>>>  and I see it is extending SqlContext which extends Logging with 
>>> Serializable.
>>>
>>> Can anyone tell me if this is the right way to use it ? Thanks for your 
>>> time.
>>>
>>> Regards,
>>>
>>> Ajay
>>>
>>>
>


Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
The error in the file I just shared is here:

val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
partition._2(0);  --> this was just partition and hence there was an
error

fetching the offset.

Still testing. Somehow Cody, your code never lead to file already
exists sort of errors (I am saving the output of the dstream
as parquet file, after converting it to a dataframe. The batch
interval will be 2 hrs)

The code in the main is here:

  val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"),
conf.getString("groupId"), conf.getString("topics"))
   val storedOffsets = offsetsStore.readOffsets()
 LogHandler.log.info("Fetched the offset from zookeeper")

 val kafkaArr =  storedOffsets match {
   case None =>
 // start from the initial offsets
 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaProps, Set(topics))

   case Some(fromOffsets) =>
 // start from previously saved offsets
 val messageHandler: MessageAndMetadata[String, Array[Byte]] =>
(String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]])
=> (mmd.key, mmd.message)
 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)

 //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage,
(String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
 }

 kafkaArr.foreachRDD{ (rdd,time) =>

val schema =
SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r =>
Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
}
val df = sql.createDataFrame(ardd,schema)
   LogHandler.log.info("Created dataframe")
   val offsetSaved =
offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
   LogHandler.log.info("Saved offset to Zookeeper")
   df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
   LogHandler.log.info("Created the parquet file")
 }

Thanks

Sunita





On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Attached is the edited code. Am I heading in right direction? Also, I am
> missing something due to which, it seems to work well as long as the
> application is running and the files are created right. But as soon as I
> restart the application, it goes back to fromOffset as 0. Any thoughts?
>
> regards
> Sunita
>
> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>
>> Thanks for confirming Cody.
>> To get to use the library, I had to do:
>>
>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
>> "/consumers/topics/"+ topics + "/0")
>>
>> It worked well. However, I had to specify the partitionId in the zkPath.
>> If I want the library to pick all the partitions for a topic, without me
>> specifying the path, is it possible out of the box or I need to tweak?
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> You are correct that you shouldn't have to worry about broker id.
>>>
>>> I'm honestly not sure specifically what else you are asking at this
>>> point.
>>>
>>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com>
>>> wrote:
>>> > Just re-read the kafka architecture. Something that slipped my mind
>>> is, it
>>> > is leader based. So topic/partitionId pair will be same on all the
>>> brokers.
>>> > So we do not need to consider brokerid while storing offsets. Still
>>> > exploring rest of the items.
>>> > regards
>>> > Sunita
>>> >
>>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <sunitarv...@gmail.com
>>> >
>>> > wrote:
>>> >>
>>> >> Hello Experts,
>>> >>
>>> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>>> >> that it is old approach. Any reasons for that? Any issues observed
>>> with
>>> >> saving to ZK. The way we are planning to use it is:
>>> >> 1. Following
>>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>>> g-zero-data-loss.html
>>> >> 2. Saving to the same file with offsetRange as a part of the file. We
>>> hope
>>> >> tha

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Attached is the edited code. Am I heading in right direction? Also, I am
missing something due to which, it seems to work well as long as the
application is running and the files are created right. But as soon as I
restart the application, it goes back to fromOffset as 0. Any thoughts?

regards
Sunita

On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Thanks for confirming Cody.
> To get to use the library, I had to do:
>
> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> "/consumers/topics/"+ topics + "/0")
>
> It worked well. However, I had to specify the partitionId in the zkPath.
> If I want the library to pick all the partitions for a topic, without me
> specifying the path, is it possible out of the box or I need to tweak?
>
> regards
> Sunita
>
>
> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> You are correct that you shouldn't have to worry about broker id.
>>
>> I'm honestly not sure specifically what else you are asking at this point.
>>
>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com>
>> wrote:
>> > Just re-read the kafka architecture. Something that slipped my mind is,
>> it
>> > is leader based. So topic/partitionId pair will be same on all the
>> brokers.
>> > So we do not need to consider brokerid while storing offsets. Still
>> > exploring rest of the items.
>> > regards
>> > Sunita
>> >
>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <sunitarv...@gmail.com>
>> > wrote:
>> >>
>> >> Hello Experts,
>> >>
>> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>> >> that it is old approach. Any reasons for that? Any issues observed with
>> >> saving to ZK. The way we are planning to use it is:
>> >> 1. Following
>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>> g-zero-data-loss.html
>> >> 2. Saving to the same file with offsetRange as a part of the file. We
>> hope
>> >> that there are no partial writes/ overwriting is possible and
>> offsetRanges
>> >>
>> >> However I have below doubts which I couldnt figure out from the code
>> here
>> >> -
>> >> https://github.com/ippontech/spark-kafka-source/blob/master/
>> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>> >> 1. The brokerId is not part of the OffsetRange. How will just the
>> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
>> and
>> >> multiple partitions/topic.
>> >> 2. Do we have to specify zkPath to include the partitionid. I tried
>> using
>> >> the ZookeeperOffsetStore as is and it required me to specify the
>> >> partitionId:
>> >>
>> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.get
>> String("zkHosts"),
>> >> "/consumers/topics/"+ topics + "/0")
>> >>
>> >> For our usecases it is too limiting to include partitionId in the path.
>> >> To get it to work by automatically detecting the existing partitions
>> for a
>> >> given topic, I changed it as below (inspired from
>> >> http://www.programcreek.com/java-api-examples/index.php?api=
>> kafka.utils.ZKGroupTopicDirs):
>> >>
>> >> /**
>> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>> >>   * groupID consumer group to get offsets for
>> >>   * topic topic to get offsets for
>> >>   * return - mapping of (topic and) partition to offset
>> >>   */
>> >> private def getOffsets(groupID :String, topic: String):Option[String]
>> = {
>> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>> >>   val topicSeq = List(topic).toSeq
>> >>  // try {
>> >> val partitions = ZkUtils.getPartitionsForTopics(zkClient,
>> topicSeq)
>> >> var partition:Object=null
>> >> for (partition <- partitions) {
>> >>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
>> "/" +
>> >> partition;
>> >>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkCl
>> ient,
>> >> partitionOffsetPath)._1;
>> >>   val offset:Long = if(mayb

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Thanks for confirming Cody.
To get to use the library, I had to do:

val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"), "/consumers/topics/"+
topics + "/0")

It worked well. However, I had to specify the partitionId in the zkPath.
If I want the library to pick all the partitions for a topic, without me
specifying the path, is it possible out of the box or I need to tweak?

regards
Sunita


On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You are correct that you shouldn't have to worry about broker id.
>
> I'm honestly not sure specifically what else you are asking at this point.
>
> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
> > Just re-read the kafka architecture. Something that slipped my mind is,
> it
> > is leader based. So topic/partitionId pair will be same on all the
> brokers.
> > So we do not need to consider brokerid while storing offsets. Still
> > exploring rest of the items.
> > regards
> > Sunita
> >
> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <sunitarv...@gmail.com>
> > wrote:
> >>
> >> Hello Experts,
> >>
> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
> >> that it is old approach. Any reasons for that? Any issues observed with
> >> saving to ZK. The way we are planning to use it is:
> >> 1. Following
> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-
> achieving-zero-data-loss.html
> >> 2. Saving to the same file with offsetRange as a part of the file. We
> hope
> >> that there are no partial writes/ overwriting is possible and
> offsetRanges
> >>
> >> However I have below doubts which I couldnt figure out from the code
> here
> >> -
> >> https://github.com/ippontech/spark-kafka-source/blob/
> master/src/main/scala/com/ippontech/kafka/stores/
> ZooKeeperOffsetsStore.scala
> >> 1. The brokerId is not part of the OffsetRange. How will just the
> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
> and
> >> multiple partitions/topic.
> >> 2. Do we have to specify zkPath to include the partitionid. I tried
> using
> >> the ZookeeperOffsetStore as is and it required me to specify the
> >> partitionId:
> >>
> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"),
> >> "/consumers/topics/"+ topics + "/0")
> >>
> >> For our usecases it is too limiting to include partitionId in the path.
> >> To get it to work by automatically detecting the existing partitions
> for a
> >> given topic, I changed it as below (inspired from
> >> http://www.programcreek.com/java-api-examples/index.php?
> api=kafka.utils.ZKGroupTopicDirs):
> >>
> >> /**
> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
> >>   * groupID consumer group to get offsets for
> >>   * topic topic to get offsets for
> >>   * return - mapping of (topic and) partition to offset
> >>   */
> >> private def getOffsets(groupID :String, topic: String):Option[String] =
> {
> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
> >>   val topicSeq = List(topic).toSeq
> >>  // try {
> >> val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
> >> var partition:Object=null
> >> for (partition <- partitions) {
> >>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
> "/" +
> >> partition;
> >>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(
> zkClient,
> >> partitionOffsetPath)._1;
> >>   val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong
> >> else 0L;
> >>   val topicAndPartition:TopicAndPartition  = new
> >> TopicAndPartition(topic, Integer.parseInt(partition.toString));
> >>   offsets.put(topicAndPartition, offset)
> >> }
> >>   //}
> >> Option(offsets.mkString(","))
> >> }
> >>
> >> // Read the previously saved offsets from Zookeeper
> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
> >>
> >>   LogHandler.log.info("Reading offsets from ZooKeeper")
> >>
> >>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
> >>   val start = System.currentTimeMillis()
> >>   of

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Just re-read the kafka architecture. Something that slipped my mind is, it
is leader based. So topic/partitionId pair will be same on all the brokers.
So we do not need to consider brokerid while storing offsets. Still
exploring rest of the items.
regards
Sunita

On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Hello Experts,
>
> I am trying to use the saving to ZK design. Just saw Sudhir's comments
> that it is old approach. Any reasons for that? Any issues observed with
> saving to ZK. The way we are planning to use it is:
> 1. Following http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
> g-zero-data-loss.html
> 2. Saving to the same file with offsetRange as a part of the file. We hope
> that there are no partial writes/ overwriting is possible and offsetRanges
>
> However I have below doubts which I couldnt figure out from the code here
> - https://github.com/ippontech/spark-kafka-source/blob/master/
> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
> 1. The brokerId is not part of the OffsetRange. How will just the
> partitionId:FromOffset stay unique in a cluster with multiple brokers and
> multiple partitions/topic.
> 2. Do we have to specify zkPath to include the partitionid. I tried using
> the ZookeeperOffsetStore as is and it required me to specify the
> partitionId:
>
> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> "/consumers/topics/"+ topics + "/0")
>
> For our usecases it is too limiting to include partitionId in the path.
> To get it to work by automatically detecting the existing partitions for a 
> given topic, I changed it as below (inspired from 
> http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):
>
> /**
>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>   * groupID consumer group to get offsets for
>   * topic topic to get offsets for
>   * return - mapping of (topic and) partition to offset
>   */
> private def getOffsets(groupID :String, topic: String):Option[String] = {
>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>   val topicSeq = List(topic).toSeq
>  // try {
> val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
> var partition:Object=null
> for (partition <- partitions) {
>   val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" + 
> partition;
>   val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkClient, 
> partitionOffsetPath)._1;
>   val offset:Long = if(maybeOffset.isDefined) maybeOffset.get.toLong else 
> 0L;
>   val topicAndPartition:TopicAndPartition  = new TopicAndPartition(topic, 
> Integer.parseInt(partition.toString));
>   offsets.put(topicAndPartition, offset)
> }
>   //}
> Option(offsets.mkString(","))
> }
>
> // Read the previously saved offsets from Zookeeper
> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>
>   LogHandler.log.info("Reading offsets from ZooKeeper")
>
>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>   val start = System.currentTimeMillis()
>   offsetsRangesStrOpt match {
> case Some(offsetsRangesStr) =>
>   LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
>
>   val offsets = offsetsRangesStr.split(",")
> .map(s => s.split(":"))
> .map { case Array(partitionStr, offsetStr) => 
> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
> .toMap
>
>   LogHandler.log.info("Done reading offsets from ZooKeeper. Took " + 
> (System.currentTimeMillis() - start))
>
>   Some(offsets)
> case None =>
>   LogHandler.log.info("No offsets found in ZooKeeper. Took " + 
> (System.currentTimeMillis() - start))
>   None
>   }
>
> }
>
> However, I am concerned if the saveOffsets will work well with this
> approach. Thats when I realized we are not considering brokerIds which
> storing offsets and probably the OffsetRanges does not have it either. It
> can only provide Topic, partition, from and until offsets.
>
> I am probably missing something very basic. Probably the library works
> well by itself. Can someone/ Cody explain?
>
> Cody, Thanks a lot for sharing your work.
>
> regards
> Sunita
>
>
> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> See
>> https://github.com/koeninger/kafka-exactly-once
>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed

Re: Zero Data Loss in Spark with Kafka

2016-10-25 Thread Sunita Arvind
Hello Experts,

I am trying to use the saving to ZK design. Just saw Sudhir's comments that
it is old approach. Any reasons for that? Any issues observed with saving
to ZK. The way we are planning to use it is:
1. Following http://aseigneurin.github.io/2016/05/07/spark-kafka-
achieving-zero-data-loss.html
2. Saving to the same file with offsetRange as a part of the file. We hope
that there are no partial writes/ overwriting is possible and offsetRanges

However I have below doubts which I couldnt figure out from the code here -
https://github.com/ippontech/spark-kafka-source/blob/
master/src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
1. The brokerId is not part of the OffsetRange. How will just the
partitionId:FromOffset stay unique in a cluster with multiple brokers and
multiple partitions/topic.
2. Do we have to specify zkPath to include the partitionid. I tried using
the ZookeeperOffsetStore as is and it required me to specify the
partitionId:

val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"), "/consumers/topics/"+
topics + "/0")

For our usecases it is too limiting to include partitionId in the path.
To get it to work by automatically detecting the existing partitions
for a given topic, I changed it as below (inspired from
http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):

/**
  * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
  * groupID consumer group to get offsets for
  * topic topic to get offsets for
  * return - mapping of (topic and) partition to offset
  */
private def getOffsets(groupID :String, topic: String):Option[String] = {
  val topicDirs = new ZKGroupTopicDirs(groupID, topic)
  val offsets = new mutable.HashMap[TopicAndPartition,Long]()
  val topicSeq = List(topic).toSeq
 // try {
val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
var partition:Object=null
for (partition <- partitions) {
  val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
"/" + partition;
  val maybeOffset:Option[String] =
ZkUtils.readDataMaybeNull(zkClient, partitionOffsetPath)._1;
  val offset:Long = if(maybeOffset.isDefined)
maybeOffset.get.toLong else 0L;
  val topicAndPartition:TopicAndPartition  = new
TopicAndPartition(topic, Integer.parseInt(partition.toString));
  offsets.put(topicAndPartition, offset)
}
  //}
Option(offsets.mkString(","))
}

// Read the previously saved offsets from Zookeeper
override def readOffsets: Option[Map[TopicAndPartition, Long]] = {

  LogHandler.log.info("Reading offsets from ZooKeeper")

  val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
  val start = System.currentTimeMillis()
  offsetsRangesStrOpt match {
case Some(offsetsRangesStr) =>
  LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")

  val offsets = offsetsRangesStr.split(",")
.map(s => s.split(":"))
.map { case Array(partitionStr, offsetStr) =>
(TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
.toMap

  LogHandler.log.info("Done reading offsets from ZooKeeper. Took "
+ (System.currentTimeMillis() - start))

  Some(offsets)
case None =>
  LogHandler.log.info("No offsets found in ZooKeeper. Took " +
(System.currentTimeMillis() - start))
  None
  }

}

However, I am concerned if the saveOffsets will work well with this
approach. Thats when I realized we are not considering brokerIds which
storing offsets and probably the OffsetRanges does not have it either. It
can only provide Topic, partition, from and until offsets.

I am probably missing something very basic. Probably the library works well
by itself. Can someone/ Cody explain?

Cody, Thanks a lot for sharing your work.

regards
Sunita

On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <c...@koeninger.org> wrote:

> See
> https://github.com/koeninger/kafka-exactly-once
> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" <mdkhajaasm...@gmail.com>
> wrote:
>
>> Hi Experts,
>>
>> I am looking for some information on how to acheive zero data loss while
>> working with kafka and Spark. I have searched online and blogs have
>> different answer. Please let me know if anyone has idea on this.
>>
>> Blog 1:
>> https://databricks.com/blog/2015/01/15/improved-driver-fault
>> -tolerance-and-zero-data-loss-in-spark-streaming.html
>>
>>
>> Blog2:
>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>> g-zero-data-loss.html
>>
>>
>> Blog one simply says configuration change with checkpoint directory and
>> blog 2 give details about on how to save offsets to zoo keeper. can you
>> please help me out with right approach.
>>
>> Thanks,
>> Asmath
>>
>>
>>


Spark writing to elasticsearch asynchronously

2016-09-21 Thread Sunita Arvind
Hello Experts,

Is there a way to get spark to write to elasticsearch asynchronously?
Below are the details
http://stackoverflow.com/questions/39624538/spark-savetoes-asynchronously

regards
Sunita


Increasing spark.yarn.executor.memoryOverhead degrades performance

2016-07-18 Thread Sunita Arvind
Hello Experts,

For one of our streaming appilcation, we intermittently saw:

WARN yarn.YarnAllocator: Container killed by YARN for exceeding memory
limits. 12.0 GB of 12 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.

Based on what I found on internet and the error message, I increased the
memoryOverhead to 768. This is actually slowing the application. We are on
spark1.3, so not sure if its due to any GC pauses. Just to do some
intelligent trials, I wanted to understand what could be causing the
degrade. Should I increase driver memoryOverhead also? Another interesting
observation is, bringing down the executor memory to 5GB with executor
memoryOverhead to 768 showed significant performance gains. What are the
other associated settings?

regards
Sunita


Re: Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-07-14 Thread Sunita Arvind
Thank you for your inputs. Will test it out and share my findings



On Thursday, July 14, 2016, CosminC  wrote:

> Didn't have the time to investigate much further, but the one thing that
> popped out is that partitioning was no longer working on 1.6.1. This would
> definitely explain the 2x performance loss.
>
> Checking 1.5.1 Spark logs for the same application showed that our
> partitioner was working correctly, and after the DStream / RDD creation a
> user session was only processed on a single machine. Running on top of
> 1.6.1
> though, the session was processed on up to 4 machines, in a 5 node cluster
> including the driver, with a lot of redundant operations. We use a custom
> but very simple partitioner which extends HashPartitioner. It partitions on
> a case class which has a single string parameter.
>
> Speculative operations are turned off by default, and we never enabled it,
> so it's not that.
>
> Right now we're postponing any Spark upgrade, and we'll probably try to
> upgrade directly to Spark 2.0, hoping the partitioning issue is no longer
> present there.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-tp27056p27334.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>
>


Re: Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-07-13 Thread Sunita
I am facing the same issue. Upgrading to Spark1.6 is causing hugh performance
loss. Could you solve this issue? I am also attempting memory settings as
mentioned
http://spark.apache.org/docs/latest/configuration.html#memory-management

But its not making a lot of difference. Appreciate your inputs on this



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-tp27056p27330.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Maintain complete state for updateStateByKey

2016-07-06 Thread Sunita Arvind
Hello Experts,

I have a requirement of maintaining a list of ids for every customer for
all of time. I should be able to provide count distinct ids on demand. All
the examples I have seen so far indicate I need to maintain counts
directly. My concern is, I will not be able to identify cumulative distinct
values in that case. Also, maintaining a state so huge would be tolerable
to the framework?

Here is what I am attempting:

val updateUniqueids :Option[RDD[String]] = (values: Seq[String],
state: Option[RDD[String]]) => {
  val currentLst = values.distinct
  val previousLst:RDD[String] = state.getOrElse().asInstanceOf[RDD[String]]
  Some(currentLst.union(previousLst).distinct)
}

Another challenge is concatenating RDD[String] and Seq[String] without
being able to access the spark context as this function has to adhere to

updateFunc: (Seq[V], Option[S]) => Option[S]

I'm also trying to figure out if I can use the
(iterator: Iterator[(K, Seq[V], Option[S])]) but haven't figured it out yet.

Appreciate any suggestions in this regard.

regards

Sunita

P.S:
I am aware of mapwithState but not on the latest version as of now.


Re: NullPointerException when starting StreamingContext

2016-06-24 Thread Sunita Arvind
I was able to resolve the serialization issue. The root cause was, I was
accessing the config values within foreachRDD{}.
The solution was to extract the values from config outside the foreachRDD
scope and send in values to the loop directly. Probably something obvious
as we cannot have nested distribution data sets. Mentioning it here for
benefit of anyone else stumbling upon the same issue.

regards
Sunita

On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Hello Experts,
>
> I am getting this error repeatedly:
>
> 16/06/23 03:06:59 ERROR streaming.StreamingContext: Error starting the 
> context, marking it as stopped
> java.lang.NullPointerException
>   at 
> com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202)
>   at 
> com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228)
>   at 
> com.typesafe.config.ConfigException.writeObject(ConfigException.java:58)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>   at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440)
>   at java.lang.Throwable.writeObject(Throwable.java:985)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>   at 
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:141)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>   at 
> org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:141)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251)
>   at 
> org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:142)
>   at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:554)
>   at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
>   at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
>   at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:73)
>   at 
> com.edgecast.engine.ProcessingEngine$$anonfun$main$1.apply(ProcessingEngine.scala:67)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:67)
>   at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
>
>
> It seems to be a typical issue. All I am doing here is as below:
>
> Object ProcessingEngine{
>
> def initializeSpark(customer:String):StreamingContext={
>   LogHandler.log.info("InitialeSpark")
>   val custConf = ConfigFactory.load(customer + 
> ".conf").getConfig(customer).withFallback(AppConf)
>   implicit val sparkConf: SparkConf = new SparkConf().setAppName(cust

Re: NullPointerException when starting StreamingContext

2016-06-23 Thread Sunita Arvind
Also, just to keep it simple, I am trying to use 1.6.0CDH5.7.0 in the
pom.xml as the cluster I am trying to run on is CDH5.7.0 with spark 1.6.0.

Here is my pom setting:


1.6.0-cdh5.7.0

org.apache.spark
spark-core_2.10
${cdh.spark.version}
compile


org.apache.spark
spark-streaming_2.10
${cdh.spark.version}
compile


org.apache.spark
spark-sql_2.10
${cdh.spark.version}
compile


org.apache.spark
spark-streaming-kafka_2.10
${cdh.spark.version}
compile


org.apache.kafka
kafka_2.10
0.8.2.1
compile


But trying to execute the application throws errors like below:
Exception in thread "main" java.lang.NoClassDefFoundError:
kafka/cluster/BrokerEndPoint
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at
org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
at
org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
at
org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
at
org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:150)
at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at
com.edgecast.engine.ConcurrentOps$.createDataStreamFromKafka(ConcurrentOps.scala:68)
at
com.edgecast.engine.ConcurrentOps$.startProcessing(ConcurrentOps.scala:32)
at com.edgecast.engine.ProcessingEngine$.main(ProcessingEngine.scala:33)
at com.edgecast.engine.ProcessingEngine.main(ProcessingEngine.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: kafka.cluster.BrokerEndPoint
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 38 more
16/06/23 11:09:53 INFO SparkContext: Invoking stop() from shutdown hook


I've tried kafka version 0.8.2.0, 0.8.2.2, 0.9.0.0. With 0.9.0.0 the
processing hangs much sooner.
Can someone help with this error?

regards
Sunita

On Wed, Jun 22, 2016 at 8:20 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Hello Experts,
>
> I am getting this error repeatedly:
>
> 16/06/23 03:06:59 ERROR st

NullPointerException when starting StreamingContext

2016-06-22 Thread Sunita Arvind
f = ConfigFactory.load()
  LogHandler.log.info("Starting the processing Engine")
  getListOfCustomers().foreach{cust =>
implicit val ssc = initializeSpark(cust)
val FullStream = createDataStreamFromKafka(cust,ssc)
ConcurrentOps.determineAllSinks(cust, FullStream)
FullStream.checkpoint(Duration(AppConf.getLong("batchDurSec") * 2000))
ssc.start()
ssc.awaitTermination()
  }

}
}

I also tried putting all the initialization directly in main (not using
method calls for initializeSpark and createDataStreamFromKafka) and also
not putting in foreach and creating a single spark and streaming context.
However, the error persists.

Appreciate any help.

regards
Sunita


Seeking advice on realtime querying over JDBC

2016-06-02 Thread Sunita Arvind
Hi Experts,

We are trying to get a kafka stream ingested in Spark and expose the
registered table over JDBC for querying. Here are some questions:
1. Spark Streaming supports single context per application right? If I have
multiple customers and would like to create a kafka topic for each of them
and 1 streaming context for every topic is this doable? As per the current
spark documentation,
http://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing-streamingcontext
I can have only 1 active streaming context at a time. Is there no way
around that? The use case here is, if I am looking at a 5 min window, the
window should have records for that customer only, which is possible only
by having customer specific streaming context.

2. If I am able to create multiple contexts in this fashion, can I register
them as temp tables in my application and expose them over JDBC. Going by
https://forums.databricks.com/questions/1464/how-to-configure-thrift-server-to-use-a-custom-spa.html,
looks like I can connect the thrift server to a single sparkSQL Context.
Having multiple streaming contexts means I automatically have multiple SQL
contexts?

3. Can I use SQLContext or do I need to have HiveContext in order to see
the tables registered via Spark application through the JDBC?

regards
Sunita


Re: Adhoc queries on Spark 2.0 with Structured Streaming

2016-05-06 Thread Sunita Arvind
Thanks for the clarification Michael and good luck with Spark 2.0. It
really looks promising.

I am especially interested in adhoc queries aspect. Probably that is what
is being referred to as Continuous SQL in the slides. What is the timeframe
for availability this functionality?

regards
Sunita

On Fri, May 6, 2016 at 2:24 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> That is a forward looking design doc and not all of it has been
> implemented yet.  With Spark 2.0 the main sources and sinks will be file
> based, though we hope to quickly expand that now that a lot of
> infrastructure is in place.
>
> On Fri, May 6, 2016 at 2:11 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> I was
>> reading 
>> StructuredStreamingProgrammingAbstractionSemanticsandAPIs-ApacheJIRA.pdf
>> attached to SPARK-8360
>>
>> On page 12, there was mentioning of .format(“kafka”) but I searched the
>> codebase and didn't find any occurrence.
>>
>> FYI
>>
>> On Fri, May 6, 2016 at 1:06 PM, Michael Malak <
>> michaelma...@yahoo.com.invalid> wrote:
>>
>>> At first glance, it looks like the only streaming data sources available
>>> out of the box from the github master branch are
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
>>>  and
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
>>>  .
>>> Out of the Jira epic for Structured Streaming
>>> https://issues.apache.org/jira/browse/SPARK-8360 it would seem the
>>> still-open https://issues.apache.org/jira/browse/SPARK-10815 "API
>>> design: data sources and sinks" is relevant here.
>>>
>>> In short, it would seem the code is not there yet to create a Kafka-fed
>>> Dataframe/Dataset that can be queried with Structured Streaming; or if it
>>> is, it's not obvious how to write such code.
>>>
>>>
>>> --
>>> *From:* Anthony May <anthony...@gmail.com>
>>> *To:* Deepak Sharma <deepakmc...@gmail.com>; Sunita Arvind <
>>> sunitarv...@gmail.com>
>>> *Cc:* "user@spark.apache.org" <user@spark.apache.org>
>>> *Sent:* Friday, May 6, 2016 11:50 AM
>>> *Subject:* Re: Adhoc queries on Spark 2.0 with Structured Streaming
>>>
>>> Yeah, there isn't even a RC yet and no documentation but you can work
>>> off the code base and test suites:
>>> https://github.com/apache/spark
>>> And this might help:
>>>
>>> https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
>>>
>>> On Fri, 6 May 2016 at 11:07 Deepak Sharma <deepakmc...@gmail.com> wrote:
>>>
>>> Spark 2.0 is yet to come out for public release.
>>> I am waiting to get hands on it as well.
>>> Please do let me know if i can download source and build spark2.0 from
>>> github.
>>>
>>> Thanks
>>> Deepak
>>>
>>> On Fri, May 6, 2016 at 9:51 PM, Sunita Arvind <sunitarv...@gmail.com>
>>> wrote:
>>>
>>> Hi All,
>>>
>>> We are evaluating a few real time streaming query engines and spark is
>>> my personal choice. The addition of adhoc queries is what is getting me
>>> further excited about it, however the talks I have heard so far only
>>> mention about it but do not provide details. I need to build a prototype to
>>> ensure it works for our use cases.
>>>
>>> Can someone point me to relevant material for this.
>>>
>>> regards
>>> Sunita
>>>
>>>
>>>
>>>
>>> --
>>> Thanks
>>> Deepak
>>> www.bigdatabig.com
>>> www.keosha.net
>>>
>>>
>>>
>>>
>>
>


Re: Adhoc queries on Spark 2.0 with Structured Streaming

2016-05-06 Thread Sunita Arvind
Agreed.
Just sharing what I saw,
http://www.slideshare.net/databricks/realtime-spark-from-interactive-queries-to-streaming

http://www.slideshare.net/rxin/the-future-of-realtime-in-spark?next_slideshow=3

It claims to support kafka, files and databases. However, continuous SQL
will be available in 2.1 or later only

regards
Sunita


On Fri, May 6, 2016 at 1:06 PM, Michael Malak <michaelma...@yahoo.com>
wrote:

> At first glance, it looks like the only streaming data sources available
> out of the box from the github master branch are
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
>  and
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
>  .
> Out of the Jira epic for Structured Streaming
> https://issues.apache.org/jira/browse/SPARK-8360 it would seem the
> still-open https://issues.apache.org/jira/browse/SPARK-10815 "API design:
> data sources and sinks" is relevant here.
>
> In short, it would seem the code is not there yet to create a Kafka-fed
> Dataframe/Dataset that can be queried with Structured Streaming; or if it
> is, it's not obvious how to write such code.
>
>
> --
> *From:* Anthony May <anthony...@gmail.com>
> *To:* Deepak Sharma <deepakmc...@gmail.com>; Sunita Arvind <
> sunitarv...@gmail.com>
> *Cc:* "user@spark.apache.org" <user@spark.apache.org>
> *Sent:* Friday, May 6, 2016 11:50 AM
> *Subject:* Re: Adhoc queries on Spark 2.0 with Structured Streaming
>
> Yeah, there isn't even a RC yet and no documentation but you can work off
> the code base and test suites:
> https://github.com/apache/spark
> And this might help:
>
> https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
>
> On Fri, 6 May 2016 at 11:07 Deepak Sharma <deepakmc...@gmail.com> wrote:
>
> Spark 2.0 is yet to come out for public release.
> I am waiting to get hands on it as well.
> Please do let me know if i can download source and build spark2.0 from
> github.
>
> Thanks
> Deepak
>
> On Fri, May 6, 2016 at 9:51 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>
> Hi All,
>
> We are evaluating a few real time streaming query engines and spark is my
> personal choice. The addition of adhoc queries is what is getting me
> further excited about it, however the talks I have heard so far only
> mention about it but do not provide details. I need to build a prototype to
> ensure it works for our use cases.
>
> Can someone point me to relevant material for this.
>
> regards
> Sunita
>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>
>
>
>


Adhoc queries on Spark 2.0 with Structured Streaming

2016-05-06 Thread Sunita Arvind
Hi All,

We are evaluating a few real time streaming query engines and spark is my
personal choice. The addition of adhoc queries is what is getting me
further excited about it, however the talks I have heard so far only
mention about it but do not provide details. I need to build a prototype to
ensure it works for our use cases.

Can someone point me to relevant material for this.

regards
Sunita


Spark SQL - Registerfunction throwing MissingRequirementError in JavaMirror with primordial classloader

2015-04-26 Thread Sunita Arvind
Hi All,

I am trying to use a function within spark sql which accepts 2 - 4
arguments. I was able to get through compilation errors however, I see the
attached runtime exception when trying from Spark SQL.
(refer attachment for the complete stacktrace- StackTraceFor_runTestInSQL)

The function itself works well when tried as a regular function.

Here is how I am trying it:

//Also tried without the defaulted  fmt parameters in the definition. The
issue persists.

 def within10yrs(FromDT: String, ToDT: String, fmt1:
DateTimeFormatter=dateFormats.MMDDHHMISS,fmt2:
DateTimeFormatter=dateFormats.MMDD):Boolean={
  println(Compute Within 10 years only if date2 is greater than date1)
 val yrsBetn = Years.yearsBetween(toDateTime(FromDT,fmt1),
toDateTime(ToDT,fmt2)).getYears
 val in10 =yrsBetn match {
 case x if(x  0  x = 10) = true
 case _ = false
  }

  println(FromDT = + FromDT + ToDT = + ToDT + within10years = + in10
+  actual number of years is  + yrsBetn)
  in10
}


def runTestInSQL(sc:SparkContext): Unit  = {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.setConf(spark.sql.parquet.binaryAsString, true)
import sqlContext._
//Tried both the below variants to register the function:
   // sqlContext.registerFunction(within10yrs,
(date1:String,date2:String,
fmt1:DateTimeFormatter,fmt2:DateTimeFormatter)=within10yrs(date1,date2,fmt1,fmt2))
  sqlContext.registerFunction(within10yrs,
(date1:String,date2:String)=within10yrs(date1,date2))

  val query1 =  select a.col1,a.col2,a.col3,b.col4,b.col5
  FROM a JOIN b on a.col1 = b.col1
  WHERE within10yrs(b.col4,a.col3)
GROUP  BY a.col1,
  a.col2,
  a.col3,
  b.col4,
  b.col5

// The table b in the query above is a nested query actually and the sql
works well without the WHERE within10yrs(b.col4,a.col3). Hence skipping
the details to keep the problem
description simple.
 val res1 = sqlContext.sql(query1)
res1.count() // Line number 70 as in the stack trace
}

Execution throws runtime exception:

//Stack trace available in the file named - StackTraceFor_runTestInSQL

def testasStdAlone()={
  val date1 = 2005-07-18 00:00:00
  val fmt1 =  dateFormats.MMDDHHMISS
  val date2 = 20150719
  val fmt2 =  dateFormats.MMDD
  println(date1 is  + date1 +  format is  +
dateFormats.MMDDHHMISS )
  println(date2 is  + date2 +  format is  + dateFormats.MMDD )
  //println(Within 10 years is  + within10yrs(date1,fmt1,date2,fmt2))
  println(Within 10 years is  + within10yrs(date1,date2))
}

   def main(x: Array[String]): Unit = {
val returned = testasStdAlone()
  //testRollup()
println(returned)
}
Output as expected:

date1 is 2005-07-18 00:00:00 format is
org.joda.time.format.DateTimeFormatter@28d101f3
date2 is 20150719 format is org.joda.time.format.DateTimeFormatter@5e411af2
Within 10 years
FromDT =2005-07-18 00:00:00ToDT =20150719within10years =true actual number
of years is 10
Within 10 years is true
()


Appreciate any direction from the community.

regards
Sunita
Exception in thread main scala.reflect.internal.MissingRequirementError: 
class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with 
primordial classloader with boot classpath 
[C:\Sunita\eclipse\configuration\org.eclipse.osgi\bundles\294\1\.cp\lib\scala-library.jar;C:\Sunita\eclipse\configuration\org.eclipse.osgi\bundles\294\1\.cp\lib\scala-swing.jar;C:\Sunita\eclipse\configuration\org.eclipse.osgi\bundles\294\1\.cp\lib\scala-actors.jar;C:\Sunita\eclipse\configuration\org.eclipse.osgi\bundles\293\1\.cp\lib\scala-reflect.jar;C:\Sunita\eclipse\configuration\org.eclipse.osgi\bundles\293\1\.cp\lib\scala-compiler.jar;C:\Java\jdk1.7.0_71\jre\lib\resources.jar;C:\Java\jdk1.7.0_71\jre\lib\rt.jar;C:\Java\jdk1.7.0_71\jre\lib\sunrsasign.jar;C:\Java\jdk1.7.0_71\jre\lib\jsse.jar;C:\Java\jdk1.7.0_71\jre\lib\jce.jar;C:\Java\jdk1.7.0_71\jre\lib\charsets.jar;C:\Java\jdk1.7.0_71\jre\lib\jfr.jar;C:\Java\jdk1.7.0_71\jre\classes]
 not found.
at 
scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
at 
scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
at 
scala.reflect.internal.Mirrors

Unable to broadcast dimension tables with Spark SQL

2015-02-16 Thread Sunita Arvind
Hi Experts,

I have a large table with 54 million records (fact table), being joined
with 6 small tables (dimension tables). The size on disk of small tables is
within 5k and the record count is in the range of 4 - 200
All the worker nodes have RAM of 32GB allocated for spark. I have tried the
below approaches and looks like the small tables are not being broadcast,
which is causing timeouts as expected and failure of the job.
The reason for this, AFAIK is, the small table is also getting shuffled and
is fitting into a single node's partition. Then the large table is made to
flow to the same node which stays busy while all other nodes are idle.

Note: The spark version in use on cluster as well as my local setup is
1.1.0. I also tried with Spark 1.2.0 in the local setup, however the
queryPlan showed no change.

1. Broadcast the RDD before registering as table:
 val k = sqlContext.parquetFile(p.fileName)
 val t = sc.broadcast(k)
 t.value.registerTempTable(p.tableName)

2. Set the variable
 sqlContext.setConf(spark.sql.autoBroadcastJoinThreshold,1)


3. Added limit to each small table before registering as table. I guess
this gives optimizer a way compute statistics and determine that the other
table is small enough for broadcast:
   sqlContext.sql(select * from a_nolim limit 7).registerTempTable(edu)

  also tried DSL style:

 a.limit(7).registerTempTable(edu)

   Tried explicit broadcasting of the tables as below:

   sc.broadcast(sqlContext.sql(select * from edu_nolim limit
7)).value.registerTempTable(edu)

   and tried dsl style with broadcast done on the rdd as well

4. Used DSL style of join:
   val try2 = a1.join(cdemo,LeftOuter,Some(dem.key1.attr ===
ed.key1.attr ))

5. Ran the below commad in hive for all small tables:
   ANALYZE TABLE  tableName COMPUTE STATISTICS noscan

   Please note, the application uses SQLContext and not hive context. Hence
I ran the compute statistics out of the application from hue - hive
editor. I am assuming the statistics are available in the metastore,
however, not sure
if spark can fetch these statistics since I am not using hive context
within the application.

6. Not sure if these are valid flags, but tried with them set anyways:
  sqlContext.setConf(spark.sql.planner.dimensionJoin,true)
  sqlContext.setConf(spark.sql.planner.generatedDimensionJoin,true)
  sqlContext.setConf(multiWayJoin,true)
  sqlContext.setConf(turbo, true)

7. Tried CacheTable for all small tables. This changes the query execution
to InMemoryRelation instead of ParquetTableScan, however, shuffle -
 Exchange (HashPartitioning [i1_education_cust_demo#29], 200) remains.

8. Reduced the shuffle partition number with this parameter -
sqlContext.setConf(spark.sql.shuffle.partitions,8). But this did not
help.

With all these attempts, the small tables are still getting shuffled I
guess. Below are the queryExecutions printed on every attempt and they have
remained almost same on every attempt:

DSL Style execution plan(i.e.
rdd1.join(rdd2,LeftOuter,Some(rdd1.key.attr === rdd2.key.attr))
-
DSL Style execution plan -- HashOuterJoin [education#18],
[i1_education_cust_demo#29], LeftOuter, None
 Exchange (HashPartitioning [education#18], 200)
  ParquetTableScan [education#18,education_desc#19], (ParquetRelation
C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_education,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), []
 Exchange (HashPartitioning [i1_education_cust_demo#29], 200)
  ParquetTableScan
[customer_id_cust_demo#20,age_dt_cust_demo#21,gndr_cd_cust_demo#22,hh_income_cust_demo#23,marital_status_cust_demo#24,ethnicity_cust_demo#25,length_of_residence_cust_demo#26,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,i1_education_cust_demo#29],
(ParquetRelation
C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_cust_demo_dm_sample,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), []


SQL Style execution plan (i.e sqlContext.sql(select a,b,c,d,e from t1 left
outer join t2 on t1.a = t2.a)
--
Project
[customer_id_cust_demo#20,i1_education_cust_demo#29,marital_status_cust_demo#24,hh_income_cust_demo#23,length_of_residence_cust_demo#26,ethnicity_cust_demo#25,gndr_cd_cust_demo#22,age_dt_cust_demo#21,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,education_desc#19]
 HashOuterJoin [i1_education_cust_demo#29], [education#18], LeftOuter, None
  Exchange (HashPartitioning

Is pair rdd join more efficient than regular rdd

2015-02-01 Thread Sunita Arvind
Hi All

We are joining large tables using spark sql and running into shuffle
issues. We have explored multiple options - using coalesce to reduce number
of partitions, tuning various parameters like disk buffer, reducing data in
chunks etc. which all seem to help btw. What I would like to know is,
is having a pair rdd over regular rdd one of the solutions ? Will it make
the joining more efficient as spark can shuffle better since it knows the
key? Logically speaking I think it should help but I haven't found any
evidence on the internet including the spark sql documentation.

It is a lot of effort for us to try this approach and weight the
performance as we need to register the output as tables to proceed using
them. Hence would appreciate inputs from the community before proceeding.


Regards
Sunita Koppar


Re: Spark job stuck at RangePartitioner at Exchange.scala:79

2015-01-21 Thread Sunita Arvind
I was able to resolve this by adding rdd.collect() after every stage. This
enforced RDD evaluation and helped avoid the choke point.

regards
Sunita Kopppar

On Sat, Jan 17, 2015 at 12:56 PM, Sunita Arvind sunitarv...@gmail.com
wrote:

 Hi,

 My spark jobs suddenly started getting hung and here is the debug leading
 to it:
 Following the program, it seems to be stuck whenever I do any collect(),
 count or rdd.saveAsParquet file. AFAIK, any operation that requires data
 flow back to master causes this. I increased the memory to 5 MB. Also, as
 per the debug statements, the memory is sufficient enough. Also increased
 -Xss and

 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(264808) called
 with curMem=0, maxMem=1019782103
 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_0 stored as
 values in memory (estimated size 258.6 KB, free 972.3 MB)
 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: collect at
 SparkPlan.scala:85
 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(210344) called
 with curMem=264808, maxMem=1019782103
 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_1 stored as
 values in memory (estimated size 205.4 KB, free 972.1 MB)
 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(282200) called
 with curMem=475152, maxMem=1019782103
 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_2 stored as
 values in memory (estimated size 275.6 KB, free 971.8 MB)
 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: RangePartitioner
 at Exchange.scala:79

 A bit of background which may or may not be relevant. The program was
 working fine in eclipse, however, was getting hung upon submission to the
 cluster. In an attempt to debug, I changed the version in build.sbt to
 match the one on the cluster

 sbt config when the program was working:
   org.apache.spark %% spark-core % 1.1.0 % provided,
   org.apache.spark %% spark-sql % 1.1.0 % provided,
   spark.jobserver % job-server-api % 0.4.0,
   com.github.nscala-time %% nscala-time % 1.6.0,
   org.apache.hadoop % hadoop-client % 2.3.0 % provided


 During debugging, I changed this to:
   org.apache.spark %% spark-core % 1.2.0 % provided,
   org.apache.spark %% spark-sql % 1.2.0 % provided,
   spark.jobserver % job-server-api % 0.4.0,
   com.github.nscala-time %% nscala-time % 1.6.0,
   org.apache.hadoop % hadoop-client % 2.5.0 % provided

 This is when the program started getting hung at the first rdd.count().
 Now, even after reverting the changes in build.sbt, my program is getting
 hung at the same point.

 Tried these config changes in addition to -Xmx and -Xss in the eclipse.ini
 to 5MB each and set the below vars programatically

 sparkConf.set(spark.akka.frameSize,10)
 sparkConf.set(spark.shuffle.spill,true)
 sparkConf.set(spark.driver.memory,512m)
 sparkConf.set(spark.executor.memory,1g)
 sparkConf.set(spark.driver.maxResultSize,1g)

 Please note. In eclipse as well as sbt the program kept throwing
 StackOverflow. Increasing Xss to 5 MB eliminated the problem,
 Could this be something unrelated to memory? The SchemaRDDs have close to
 400 columns and hence I am using StructType(StructField) and performing
 applySchema.

 My code cannot be shared right now. If required, I will edit it and post.
 regards
 Sunita





Re: Scala Spark SQL row object Ordinal Method Call Aliasing

2015-01-20 Thread Sunita Arvind
The below is not exactly a solution to your question but this is what we
are doing. For the first time we do end up doing row.getstring() and we
immediately parse it through a map function which aligns it to either a
case class or a structType. Then we register it as a table and use just
column names. The spark sql wiki has good examples for this. Looks more
easy to manage to me than your solution below.

Agree with you on the fact that when there are lot of columns,
row.getString() even once is not convenient

Regards

Sunita

On Tuesday, January 20, 2015, Night Wolf nightwolf...@gmail.com wrote:

 In Spark SQL we have Row objects which contain a list of fields that make
 up a row. A Rowhas ordinal accessors such as .getInt(0) or getString(2).

 Say ordinal 0 = ID and ordinal 1 = Name. It becomes hard to remember what
 ordinal is what, making the code confusing.

 Say for example I have the following code

 def doStuff(row: Row) = {
   //extract some items from the row into a tuple;
   (row.getInt(0), row.getString(1)) //tuple of ID, Name}

 The question becomes how could I create aliases for these fields in a Row
 object?

 I was thinking I could create methods which take a implicit Row object;

 def id(implicit row: Row) = row.getInt(0)def name(implicit row: Row) = 
 row.getString(1)

 I could then rewrite the above as;

 def doStuff(implicit row: Row) = {
   //extract some items from the row into a tuple;
   (id, name) //tuple of ID, Name}

 Is there a better/neater approach?


 Cheers,

 ~NW



Spark job stuck at RangePartitioner at Exchange.scala:79

2015-01-17 Thread Sunita Arvind
Hi,

My spark jobs suddenly started getting hung and here is the debug leading
to it:
Following the program, it seems to be stuck whenever I do any collect(),
count or rdd.saveAsParquet file. AFAIK, any operation that requires data
flow back to master causes this. I increased the memory to 5 MB. Also, as
per the debug statements, the memory is sufficient enough. Also increased
-Xss and

15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(264808) called
with curMem=0, maxMem=1019782103
15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 258.6 KB, free 972.3 MB)
15/01/17 11:44:16 INFO spark.SparkContext: Starting job: collect at
SparkPlan.scala:85
15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(210344) called
with curMem=264808, maxMem=1019782103
15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_1 stored as
values in memory (estimated size 205.4 KB, free 972.1 MB)
15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(282200) called
with curMem=475152, maxMem=1019782103
15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_2 stored as
values in memory (estimated size 275.6 KB, free 971.8 MB)
15/01/17 11:44:16 INFO spark.SparkContext: Starting job: RangePartitioner
at Exchange.scala:79

A bit of background which may or may not be relevant. The program was
working fine in eclipse, however, was getting hung upon submission to the
cluster. In an attempt to debug, I changed the version in build.sbt to
match the one on the cluster

sbt config when the program was working:
  org.apache.spark %% spark-core % 1.1.0 % provided,
  org.apache.spark %% spark-sql % 1.1.0 % provided,
  spark.jobserver % job-server-api % 0.4.0,
  com.github.nscala-time %% nscala-time % 1.6.0,
  org.apache.hadoop % hadoop-client % 2.3.0 % provided


During debugging, I changed this to:
  org.apache.spark %% spark-core % 1.2.0 % provided,
  org.apache.spark %% spark-sql % 1.2.0 % provided,
  spark.jobserver % job-server-api % 0.4.0,
  com.github.nscala-time %% nscala-time % 1.6.0,
  org.apache.hadoop % hadoop-client % 2.5.0 % provided

This is when the program started getting hung at the first rdd.count().
Now, even after reverting the changes in build.sbt, my program is getting
hung at the same point.

Tried these config changes in addition to -Xmx and -Xss in the eclipse.ini
to 5MB each and set the below vars programatically

sparkConf.set(spark.akka.frameSize,10)
sparkConf.set(spark.shuffle.spill,true)
sparkConf.set(spark.driver.memory,512m)
sparkConf.set(spark.executor.memory,1g)
sparkConf.set(spark.driver.maxResultSize,1g)

Please note. In eclipse as well as sbt the program kept throwing
StackOverflow. Increasing Xss to 5 MB eliminated the problem,
Could this be something unrelated to memory? The SchemaRDDs have close to
400 columns and hence I am using StructType(StructField) and performing
applySchema.

My code cannot be shared right now. If required, I will edit it and post.
regards
Sunita


Transform SchemaRDDs into new SchemaRDDs

2014-12-08 Thread Sunita Arvind
Hi,

I need to generate some flags based on certain columns and add it back to
the schemaRDD for further operations. Do I have to use case class
(reflection or programmatically). I am using parquet files, so schema is
being automatically derived. This is a great feature. thanks to Spark
developers, however, if subsequent createSchemaRDD doesnt work, this
feature seems unusable for advanced levels. I hope there is some way of
doing it and I am missing something. This is what I am attempting.
Appreciate your help.


Here is my code block:

import sqlContext.createSchemaRDD
 val YR_indv_purchase = createSchemaRDD(indv_purchase.map{row = {
  val v_YR = scala.math.ceil(monthsBetween(row.getString(2),
row.getString(5))(dateFormats.MMDD))
  val YR = YR+v_YR.toString()
  (row, YR)
 }
}
   ).registerTempTable(YRIndvPurchase)


---
--- just for completeness, here are the functions being used
import com.github.nscala_time.time.Imports._
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.Months

object dateFormats {
  val MMDD = DateTimeFormat.forPattern(MMDD)
}

def toDateTime(dtString: String)(implicit fmt: DateTimeFormatter): DateTime
=
fmt.parseDateTime(dtString)


 def monthsBetween(FromDT: String, ToDT: String)(implicit fmt:
DateTimeFormatter): Int =
Months.monthsBetween(toDateTime(FromDT)(fmt),
toDateTime(ToDT)(fmt)).getMonths



This compiles ok and throws a runtime exception as below:

Exception in thread main scala.MatchError: org.apache.spark.sql.Row (of
class scala.reflect.internal.Types$TypeRef$$anon$3)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:72)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:64)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:62)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:62)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:50)
at
org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:44)
at
org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:229)
at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:94)
at croevss.StageJoin$.vsswf(StageJoin.scala:162)
at croevss.StageJoin$.main(StageJoin.scala:41)
at croevss.StageJoin.main(StageJoin.scala)


regards
Sunita Koppar


Re: Spark setup on local windows machine

2014-12-02 Thread Sunita Arvind
Thanks Sameer and Akhil for your help.

I tried both your suggestions however, I still face the same issue. There
was indeed space in the installation path for Scala and Sbt since I had let
the defaults stay and hence the path was C:\Program Files . I
reinstalled scala and sbt in c:\ as well as spark.
The spark binaries I am using are built internally from source by our team
and the same binary works for rest of the team.

Here is what the spark-env.cmd looks like for me:
set SCALA_HOME=C:\scala
set
SPARK_CLASSPATH=C:\spark\bin\..\conf;C:\spark\bin\..\lib\spark-assembly-1.1.0-hadoop2.3.0.jar;C:\spark\bin\..\lib\datanucleus-api-jdo-3.2.1.jar;C:\spark\bin\..\lib\datanucleus-core-3.2.2.jar;C:\spark\bin\..\lib\datanucleus-rdbms-3.2.1.jar;C:\scala\lib\scala-library.jar;C:\scala\lib\scala-compiler.jar;



I get the same error inspite of this. The compute-classpath.cmd yields
correct results:

C:\sparkbin\spark-shell
Exception in thread main java.util.NoSuchElementException: key not found:
CLAS
SPATH
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at
org.apache.spark.deploy.SparkSubmitDriverBootstrapper$.main(SparkSubm
itDriverBootstrapper.scala:49)
at
org.apache.spark.deploy.SparkSubmitDriverBootstrapper.main(SparkSubmi
tDriverBootstrapper.scala)

regards
Sunita

On Tue, Nov 25, 2014 at 11:47 PM, Sameer Farooqui same...@databricks.com
wrote:

 Hi Sunita,

 This gitbook may also be useful for you to get Spark running in local mode
 on your Windows machine:
 http://blueplastic.gitbooks.io/how-to-light-your-spark-on-a-stick/content/

 On Tue, Nov 25, 2014 at 11:09 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You could try following this guidelines
 http://docs.sigmoidanalytics.com/index.php/How_to_build_SPARK_on_Windows

 Thanks
 Best Regards

 On Wed, Nov 26, 2014 at 12:24 PM, Sunita Arvind sunitarv...@gmail.com
 wrote:

 Hi All,

 I just installed a spark on my laptop and trying to get spark-shell to
 work. Here is the error I see:

 C:\spark\binspark-shell
 Exception in thread main java.util.NoSuchElementException: key not
 found: CLAS
 SPATH
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.MapLike$class.apply(MapLike.scala:141)
 at scala.collection.AbstractMap.apply(Map.scala:58)
 at
 org.apache.spark.deploy.SparkSubmitDriverBootstrapper$.main(SparkSubm
 itDriverBootstrapper.scala:49)
 at
 org.apache.spark.deploy.SparkSubmitDriverBootstrapper.main(SparkSubmi
 tDriverBootstrapper.scala)


 The classpath seems to be right:

 C:\spark\bincompute-classpath.cmd

 ;;C:\spark\bin\..\conf;C:\spark\bin\..\lib\spark-assembly-1.1.0-hadoop2.3.0.jar;

 ;C:\spark\bin\..\lib\datanucleus-api-jdo-3.2.1.jar;C:\spark\bin\..\lib\datanucle
 us-core-3.2.2.jar;C:\spark\bin\..\lib\datanucleus-rdbms-3.2.1.jar

 Manually exporting the classpath to include the assembly jar doesnt help
 either.

 What could be wrong with this installation? Scala and SBT are installed,
 in path and are working fine.

 Appreciate your help.
 regards
 Sunita







Spark setup on local windows machine

2014-11-25 Thread Sunita Arvind
Hi All,

I just installed a spark on my laptop and trying to get spark-shell to
work. Here is the error I see:

C:\spark\binspark-shell
Exception in thread main java.util.NoSuchElementException: key not found:
CLAS
SPATH
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at
org.apache.spark.deploy.SparkSubmitDriverBootstrapper$.main(SparkSubm
itDriverBootstrapper.scala:49)
at
org.apache.spark.deploy.SparkSubmitDriverBootstrapper.main(SparkSubmi
tDriverBootstrapper.scala)


The classpath seems to be right:

C:\spark\bincompute-classpath.cmd
;;C:\spark\bin\..\conf;C:\spark\bin\..\lib\spark-assembly-1.1.0-hadoop2.3.0.jar;
;C:\spark\bin\..\lib\datanucleus-api-jdo-3.2.1.jar;C:\spark\bin\..\lib\datanucle
us-core-3.2.2.jar;C:\spark\bin\..\lib\datanucleus-rdbms-3.2.1.jar

Manually exporting the classpath to include the assembly jar doesnt help
either.

What could be wrong with this installation? Scala and SBT are installed, in
path and are working fine.

Appreciate your help.
regards
Sunita


GraphX usecases

2014-08-25 Thread Sunita Arvind
Hi,

I am exploring GraphX library and trying to determine which usecases make
most sense for/with it. From what I initially thought, it looked like
GraphX could be applied to data stored in RDBMSs as Spark could translate
the relational data into graphical representation. However, there seems to
be no conversation and everything presented in GraphX implementations
AFAIK, works on vertices and edges. So does it mean that GraphX is only
relevant when the backend is a GDBMS?

Does this We introduce GraphX, which combines the advantages of both
data-parallel and graph-parallel systems by efficiently expressing graph
computation within the Spark data-parallel framework. We leverage new ideas
in distributed graph representation to efficiently distribute graphs as
tabular data-structures. Similarly, we leverage advances in data-flow
systems to exploit in-memory computation and fault-tolerance. mean that
GraphX makes the typical RDBMS operations possible even when the data is
persisted in a GDBMS and not viceversa?

regards
Sunita


Re: Integrate Spark Editor with Hue for source compiled installation of spark/spark-jobServer

2014-07-03 Thread Sunita Arvind
That's good to know. I will try it out.

Thanks Romain

On Friday, June 27, 2014, Romain Rigaux romain.rig...@gmail.com wrote:

 So far Spark Job Server does not work with Spark 1.0:
 https://github.com/ooyala/spark-jobserver

 So this works only with Spark 0.9 currently:

 http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/

 Romain



 Romain


 On Tue, Jun 24, 2014 at 9:04 AM, Sunita Arvind sunitarv...@gmail.com
 javascript:_e(%7B%7D,'cvml','sunitarv...@gmail.com'); wrote:

 Hello Experts,

 I am attempting to integrate Spark Editor with Hue on CDH5.0.1. I have
 the spark installation build manually from the sources for spark1.0.0. I am
 able to integrate this with cloudera manager.

 Background:
 ---
 We have a 3 node VM cluster with CDH5.0.1
 We requried spark1.0.0 due to some features in it, so I did a

  yum remove spark-core spark-master spark-worker spark-python

  of the default spark0.9.0 and compiled spark1.0.0 from source:

 Downloaded the spark-trunk from

 git clone https://github.com/apache/spark.git
 cd spark
 SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true ./sbt/sbt assembly

  The spark-assembly-1.0.0-SNAPSHOT-hadoop2.2.0.jar was built and spark by
 itself seems to work well. I was even able to run a text file count.

 Current attempt:
 
 Referring to this article -
 http://gethue.com/a-new-spark-web-ui-spark-app/
 Now I am trying to add the Spark editor to Hue. AFAIK, this requires
 git clone https://github.com/ooyala/spark-jobserver.git
 cd spark-jobserver
 sbt
 re-start

 This was successful after lot of struggle with the proxy settings.
 However, is this the job Server itself? Will that mean the job Server has
 to be manually started. I intend to have the spark editor show up in hue
 web UI and I am no way close. Can some one please help?

 Note, the 3 VMs are Linux CentOS. Not sure if setting something like can
 be expected to work.:

 [desktop]
 app_blacklist=


 Also, I have made the changes to vim .
 /job-server/src/main/resources/application.conf as recommended, however,
 I do not expect this to impact hue in any way.

 Also, I intend to let the editor stay available, not spawn it everytime
 it is required.


 Thanks in advance.

 regards