Re: [Spark Internals]: Is sort order preserved after partitioned write?

2022-09-26 Thread Swetha Baskaran
Hi Enrico, Using Spark version 3.1.3 and turning AQE off seems to fix the
sorting. Looking into why, do you have thoughts?

Thanks, Swetha

On Sat, Sep 17, 2022 at 1:58 PM Enrico Minack 
wrote:

> Hi,
>
> from a quick glance over your transformations, sortCol should be sorted.
>
> Are you using Spark 3.2 or above? Can you try again with AQE turned off in
> that case?
>
>
> https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution
>
> Enrico
>
>
>
> Am 16.09.22 um 23:28 schrieb Swetha Baskaran:
>
> Hi Enrico,
>
> Thank you for your response!
> Could you clarify what you mean by *values for "col1" will be "randomly"
> allocated to partition files*?
>
> We observe one file per partition, however we see an alternating pattern
> of unsorted rows in some files.
> Here is the code used and the unsorted pattern observed in the output
> files.
>
>
>
>
>
>
>
>
>
> *df .repartition(col("day"), col("month"), col("year"))
> .withColumn("partitionId",spark_partition_id)
> .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId)
> .sortWithinPartitions("year", "month", "day", "sortCol")
> .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId)
>   .write .partitionBy("year", "month", "day") .parquet(path)*
>
> 1
> +---+---+-+---+
> |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
> +---+---+-+---+
> | 10|732|6287832121344|  
> 6287832121344|
> |1170583|732|6287842137820|  
> 6287876860586|
> | 10|732|6287879216173|  
> 6287832121345|
> |1170583|732|6287890351126|  
> 6287876860587|
> | 10|732|6287832569336|  
> 6287832121346|
> |1170583|732|6287843957457|  
> 6287876860588|
> | 10|732|6287881576840|  
> 6287832121347|
> |1170583|732|6287892533054|  
> 6287876860589|
> | 10|732|6287833244394|  
> 6287832121348|
> |1170583|732|6287847669077|  
> 6287876860590|
> | 10|732|6287884414741|  
> 6287832121349|
> |1170583|732|6287894723328|  
> 6287876860591|
> | 10|732|6287833768679|  
> 6287832121350|
> |1170583|732|6287849212375|  
> 6287876860592|
> | 10|732|6287885330261|  
> 6287832121351|
> |1170583|732|6287896605691|  
> 6287876860593|
> | 10|732|6287835089415|  
> 6287832121352|
> |1170583|732|6287851414977|  
> 6287876860594|
> | 10|732|6287886356164|  
> 6287832121353|
> |1170583|732|6287899702397|  
> 6287876860595|
> +---+---+-+---+
>
> 2
> +---+---+-+---+
> |sortCol|partitionId|monotonicallyIncreasingIdUnsorted|monotonicallyIncreasingIdSorted|
> +---+---+-+---+
> | 10|136|1168231104512|  
> 1168231104512|
> |1215330|136|1168267800695|  
> 1168275843754|
> | 10|136|1168365908174|  
> 1168231104513|
> |1215330|136|1168272121474|  
> 1168275843755|
> | 10|136|1168233930111|  
> 1168231104514|
> |1215330|136|1168275020862|  
> 1168275843756|
> | 10|136|1168369592448|  
> 1168231104515|
> |1215331|136|1168320

Re: [Spark Internals]: Is sort order preserved after partitioned write?

2022-09-16 Thread Swetha Baskaran
 581|4990789773711|
  4990796737195|
| 10|581|4990754836237|
  4990751997954|
|1207875|581|4990792883763|
  4990796737196|
| 10|581|4990799663372|
  4990751997955|
|1207875|581|4990795135016|
  4990796737197|
| 10|581|499075488|
  4990751997956|
|1207875|581|4990796258628|
  4990796737198|
| 10|581|4990801912980|
  4990751997957|
|1207876|581|4990798880125|
  4990796737199|
| 10|581|4990755328908|
  4990751997958|
|1207876|581|4990753105828|
  4990796737200|
| 10|581|4990804520539|
  4990751997959|
|1207876|581|4990800771248|
  4990796737201|
| 10|581|4990756046653|
  4990751997960|
|1207876|581|4990757154529|
  4990796737202|
| 10|581|4990806212169|
  4990751997961|
|1207876|581|    4990803020856|
  4990796737203|
+---+---+-+---+



Thanks,
Swetha



On Fri, Sep 16, 2022 at 1:45 AM Enrico Minack 
wrote:

> Yes, you can expect each partition file to be sorted by "col1" and "col2".
>
> However, values for "col1" will be "randomly" allocated to partition
> files, but all rows with the same value for "col1" will reside in the same
> one partition file.
>
> What kind of unexpected sort order do you observe?
>
> Enrico
>
>
>
> Am 16.09.22 um 05:42 schrieb Swetha Baskaran:
>
> Hi!
>
> We expected the order of sorted partitions to be preserved after a
> dataframe write. We use the following code to write out one file per
> partition, with the rows sorted by a column.
>
>
>
>
>
>
> *df .repartition($"col1") .sortWithinPartitions("col1", "col2")
>   .write .partitionBy("col1") .csv(path)*
>
> However we observe unexpected sort order in some files. Does spark
> guarantee sort order within partitions on write?
>
>
> Thanks,
> swebask
>
>
>


[Spark Internals]: Is sort order preserved after partitioned write?

2022-09-15 Thread Swetha Baskaran
Hi!

We expected the order of sorted partitions to be preserved after a
dataframe write. We use the following code to write out one file per
partition, with the rows sorted by a column.






*df.repartition($"col1").sortWithinPartitions("col1", "col2")
.write.partitionBy("col1").csv(path)*

However we observe unexpected sort order in some files. Does spark
guarantee sort order within partitions on write?


Thanks,
swebask


Read text file row by row and apply conditions

2019-09-29 Thread swetha kadiyala
dear friends,

I am new to spark. can you please help me to read the below text file using
spark and scala.

Sample data

bret|lee|A|12345|ae545|gfddfg|86786786
142343345||D|ae342
67567|6|U|aadfsd|34k4|84304|020|sdnfsdfn|3243|benej|32432|jsfsdf|3423
67564|67747|U|aad434|3435|843454|203|sdn454dfn|3233|gdfg|34325|sdfsddf|7657


I am receiving indicator type with 3 rd column of each row. if my indicator
type=A, then i need to store that particular row data into a table called
Table1.
if indicator type=D then i have to store data into seperate table called
TableB and same as indicator type=U then i have to store all rows data into
a separate table called Table3.

Can anyone help me how to read row by row and split the columns and apply
the condition based on indicator type and store columns data into
respective tables.

Thanks,
Swetha


Re: Spark CSV Quote only NOT NULL

2019-07-13 Thread Swetha Ramaiah
Glad to help!

On Sat, Jul 13, 2019 at 12:17 PM Gourav Sengupta 
wrote:

> Hi Swetha,
> I always look into the  source code a lot but it never occured to me to
> look into the test suite, thank a ton for the tip.  Does definitely give
> quite a few ideas - thanks a ton.
>
> Thanks and Regards,
> Gourav
>
> On Fri, Jul 12, 2019 at 6:51 AM Swetha Ramaiah 
> wrote:
>
>> Hi Anil
>>
>> That was an example. You can replace quote with what double quotes. But
>> these options should give you an idea on how you want treat nulls, empty
>> values and quotes.
>>
>> When I faced this issues, I forked Spark repo and looked at the test
>> suite. This definitely helped me solve my issue.
>>
>> https://github.com/apache/spark/blob/v2.4.3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
>>
>> Regards
>> Swetha
>>
>> On Jul 11, 2019, at 4:09 PM, Anil Kulkarni  wrote:
>>
>> Hi Swetha,
>>
>> Thank you.
>> But we need the data to be quoted with ".
>> and when a field is null, we dont need the quotes around it.
>>
>> Example:
>> "A",,"B","C"
>> Thanks
>> Anil
>>
>> On Thu, Jul 11, 2019, 1:51 PM Swetha Ramaiah 
>> wrote:
>>
>>> If you are using Spark 2.4.0, I think you can try something like this:
>>>
>>> .option("quote", "\u")
>>> .option("emptyValue", “”)
>>>
>>> .option("nullValue", null)
>>>
>>> Regards
>>> Swetha
>>>
>>>
>>>
>>> On Jul 11, 2019, at 1:45 PM, Anil Kulkarni  wrote:
>>>
>>> Hi Spark users,
>>>
>>> My question is :
>>> I am writing a Dataframe to csv.
>>> Option i am using as
>>> .option("quoteAll","true").
>>>
>>> This is quoting even null values and making them appear as an empty
>>> string.
>>>
>>> How do i make sure that quotes are enabled only for non null values?
>>>
>>> --
>>> Cheers,
>>> Anil Kulkarni
>>> about.me/anilkulkarni
>>> [image: Anil Kulkarni on about.me]
>>>  http://anilkulkarni.com/ <http://about.me/anilkulkarni>
>>>
>>>
>>>
>> --
Regards,
Swetha


Re: Spark CSV Quote only NOT NULL

2019-07-11 Thread Swetha Ramaiah
Hi Anil

That was an example. You can replace quote with what double quotes. But these 
options should give you an idea on how you want treat nulls, empty values and 
quotes.

When I faced this issues, I forked Spark repo and looked at the test suite. 
This definitely helped me solve my issue.
https://github.com/apache/spark/blob/v2.4.3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
<https://github.com/apache/spark/blob/v2.4.3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala>

Regards
Swetha

> On Jul 11, 2019, at 4:09 PM, Anil Kulkarni  wrote:
> 
> Hi Swetha,
> 
> Thank you. 
> But we need the data to be quoted with ". 
> and when a field is null, we dont need the quotes around it.
> 
> Example:
> "A",,"B","C"
> Thanks
> Anil
> 
> On Thu, Jul 11, 2019, 1:51 PM Swetha Ramaiah  <mailto:swetha.rama...@gmail.com>> wrote:
> If you are using Spark 2.4.0, I think you can try something like this:
> 
> .option("quote", "\u")
> .option("emptyValue", “”)
> .option("nullValue", null)
> Regards
> Swetha
> 
> 
>> On Jul 11, 2019, at 1:45 PM, Anil Kulkarni > <mailto:anil...@gmail.com>> wrote:
>> 
>> Hi Spark users,
>> 
>> My question is :
>> I am writing a Dataframe to csv. 
>> Option i am using as 
>> .option("quoteAll","true").
>> 
>> This is quoting even null values and making them appear as an empty string. 
>> 
>> How do i make sure that quotes are enabled only for non null values?
>> 
>> -- 
>> Cheers,
>> Anil Kulkarni
>> about.me/anilkulkarni
>> 
>>  http://anilkulkarni.com/
>>  <http://about.me/anilkulkarni>
> 



Re: Spark CSV Quote only NOT NULL

2019-07-11 Thread Swetha Ramaiah
If you are using Spark 2.4.0, I think you can try something like this:

.option("quote", "\u")
.option("emptyValue", “”)
.option("nullValue", null)
Regards
Swetha


> On Jul 11, 2019, at 1:45 PM, Anil Kulkarni  wrote:
> 
> Hi Spark users,
> 
> My question is :
> I am writing a Dataframe to csv. 
> Option i am using as 
> .option("quoteAll","true").
> 
> This is quoting even null values and making them appear as an empty string. 
> 
> How do i make sure that quotes are enabled only for non null values?
> 
> -- 
> Cheers,
> Anil Kulkarni
> about.me/anilkulkarni
> 
>  http://anilkulkarni.com/
>  <http://about.me/anilkulkarni>



Re: Lag and queued up batches info in Structured Streaming UI

2018-06-27 Thread swetha kasireddy
Thanks TD, but the sql plan does not seem to provide any information on
which stage is taking longer time or to identify any bottlenecks about
various stages. Spark kafka Direct used to provide information about
various stages in a micro batch and the time taken by each stage. Is there
a way to find out stage level information like time take by each stage,
shuffle read/write data etc? Do you have any documentation on how to use
SQL tab for troubleshooting?

On Wed, Jun 20, 2018 at 6:07 PM, Tathagata Das 
wrote:

> Also, you can get information about the last progress made (input rates,
> etc.) from StreamingQuery.lastProgress, StreamingQuery.recentProgress, and
> using StreamingQueryListener.
> Its all documented - https://spark.apache.org/docs/
> latest/structured-streaming-programming-guide.html#
> monitoring-streaming-queries
>
> On Wed, Jun 20, 2018 at 6:06 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Structured Streaming does not maintain a queue of batch like DStream.
>> DStreams used to cut off batches at a fixed interval and put in a queue,
>> and a different thread processed queued batches. In contrast, Structured
>> Streaming simply cuts off and immediately processes a batch after the
>> previous batch finishes. So the question about queue size and lag does not
>> apply to Structured Streaming.
>>
>> That said, there is no UI for Structured Streaming. You can see the sql
>> plans for each micro-batch in the SQL tab.
>>
>>
>>
>>
>>
>> On Wed, Jun 20, 2018 at 12:12 PM, SRK  wrote:
>>
>>> hi,
>>>
>>> How do we get information like lag and queued up batches in Structured
>>> streaming? Following api does not seem to give any info about  lag and
>>> queued up batches similar to DStreams.
>>>
>>> https://spark.apache.org/docs/2.2.1/api/java/org/apache/spar
>>> k/streaming/scheduler/BatchInfo.html
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread swetha kasireddy
Hi Cody,

Following is the way that I am consuming data for a 60 second batch. Do you
see anything that is wrong with the way the data is getting consumed that
can cause slowness in performance?


val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> kafkaBrokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "auto.offset.reset" -> "latest",
  "heartbeat.interval.ms" -> Integer.valueOf(2),
  "session.timeout.ms" -> Integer.valueOf(6),
  "request.timeout.ms" -> Integer.valueOf(9),
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "spark.streaming.kafka.consumer.cache.enabled" -> "false",
  "group.id" -> "test1"
)

  val hubbleStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
  )

val kafkaStreamRdd = kafkaStream.transform { rdd =>
rdd.map(consumerRecord => (consumerRecord.key(), consumerRecord.value()))
}

On Mon, Aug 28, 2017 at 11:56 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> There is no difference in performance even with Cache being enabled.
>
> On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> There is no difference in performance even with Cache being disabled.
>>
>> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> So if you can run with cache enabled for some time, does that
>>> significantly affect the performance issue you were seeing?
>>>
>>> Those settings seem reasonable enough.   If preferred locations is
>>> behaving correctly you shouldn't need cached consumers for all 96
>>> partitions on any one executor, so that maxCapacity setting is
>>> probably unnecessary.
>>>
>>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>>> <swethakasire...@gmail.com> wrote:
>>> > Because I saw some posts that say that consumer cache  enabled will
>>> have
>>> > concurrentModification exception with reduceByKeyAndWIndow. I see those
>>> > errors as well after running for sometime with cache being enabled.
>>> So, I
>>> > had to disable it. Please see the tickets below.  We have 96
>>> partitions. So
>>> > if I enable cache, would teh following settings help to improve
>>> performance?
>>> >
>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>> Integer.valueOf(96),
>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>> Integer.valueOf(96),
>>> >
>>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>>> >
>>> >
>>> > http://markmail.org/message/n4cdxwurlhf44q5x
>>> >
>>> > https://issues.apache.org/jira/browse/SPARK-19185
>>> >
>>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> >>
>>> >> Why are you setting consumer.cache.enabled to false?
>>> >>
>>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com>
>>> wrote:
>>> >> > Hi,
>>> >> >
>>> >> > What would be the appropriate settings to run Spark with Kafka 10?
>>> My
>>> >> > job
>>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>>> >> > very
>>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
>>> Kafka 10
>>> >> > . I
>>> >> > see the following error sometimes . Please see the kafka parameters
>>> and
>>> >> > the
>>> >> > consumer strategy for creating the stream below. Any suggestions on
>>> how
>>> >> > to
>>> >> > run this with better performance would be of great help.
>>> >> >
>>> >> > java.lang.AssertionError: assertion failed: Failed to get records
>>> for
>>> >> > test
>>> >> > stream1 72 324027964 after polling for 12
>>> >> >
>>> >> > val kafkaParams = Map[String, Object](
>>> >> >   "bootstrap.servers" -> kafkaBroke

Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread swetha kasireddy
There is no difference in performance even with Cache being enabled.

On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> There is no difference in performance even with Cache being disabled.
>
> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> So if you can run with cache enabled for some time, does that
>> significantly affect the performance issue you were seeing?
>>
>> Those settings seem reasonable enough.   If preferred locations is
>> behaving correctly you shouldn't need cached consumers for all 96
>> partitions on any one executor, so that maxCapacity setting is
>> probably unnecessary.
>>
>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>> <swethakasire...@gmail.com> wrote:
>> > Because I saw some posts that say that consumer cache  enabled will have
>> > concurrentModification exception with reduceByKeyAndWIndow. I see those
>> > errors as well after running for sometime with cache being enabled. So,
>> I
>> > had to disable it. Please see the tickets below.  We have 96
>> partitions. So
>> > if I enable cache, would teh following settings help to improve
>> performance?
>> >
>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>> Integer.valueOf(96),
>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>> Integer.valueOf(96),
>> >
>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>> >
>> >
>> > http://markmail.org/message/n4cdxwurlhf44q5x
>> >
>> > https://issues.apache.org/jira/browse/SPARK-19185
>> >
>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >>
>> >> Why are you setting consumer.cache.enabled to false?
>> >>
>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com>
>> wrote:
>> >> > Hi,
>> >> >
>> >> > What would be the appropriate settings to run Spark with Kafka 10? My
>> >> > job
>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> >> > very
>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
>> Kafka 10
>> >> > . I
>> >> > see the following error sometimes . Please see the kafka parameters
>> and
>> >> > the
>> >> > consumer strategy for creating the stream below. Any suggestions on
>> how
>> >> > to
>> >> > run this with better performance would be of great help.
>> >> >
>> >> > java.lang.AssertionError: assertion failed: Failed to get records for
>> >> > test
>> >> > stream1 72 324027964 after polling for 12
>> >> >
>> >> > val kafkaParams = Map[String, Object](
>> >> >   "bootstrap.servers" -> kafkaBrokers,
>> >> >   "key.deserializer" -> classOf[StringDeserializer],
>> >> >   "value.deserializer" -> classOf[StringDeserializer],
>> >> >   "auto.offset.reset" -> "latest",
>> >> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>> >> >   "session.timeout.ms" -> Integer.valueOf(6),
>> >> >   "request.timeout.ms" -> Integer.valueOf(9),
>> >> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>> >> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >> >   "group.id" -> "test1"
>> >> > )
>> >> >
>> >> >   val hubbleStream = KafkaUtils.createDirectStream[String,
>> String](
>> >> > ssc,
>> >> > LocationStrategies.PreferConsistent,
>> >> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>> >> > kafkaParams)
>> >> >   )
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-p
>> erformance-while-running-Spark-Kafka-Direct-Streaming-with-
>> Kafka-10-cluster-tp29108.html
>> >> > Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >> >
>> >> > 
>> -
>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >
>> >
>> >
>>
>
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread swetha kasireddy
There is no difference in performance even with Cache being disabled.

On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <c...@koeninger.org> wrote:

> So if you can run with cache enabled for some time, does that
> significantly affect the performance issue you were seeing?
>
> Those settings seem reasonable enough.   If preferred locations is
> behaving correctly you shouldn't need cached consumers for all 96
> partitions on any one executor, so that maxCapacity setting is
> probably unnecessary.
>
> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
> <swethakasire...@gmail.com> wrote:
> > Because I saw some posts that say that consumer cache  enabled will have
> > concurrentModification exception with reduceByKeyAndWIndow. I see those
> > errors as well after running for sometime with cache being enabled. So, I
> > had to disable it. Please see the tickets below.  We have 96 partitions.
> So
> > if I enable cache, would teh following settings help to improve
> performance?
> >
> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
> Integer.valueOf(96),
> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
> Integer.valueOf(96),
> >
> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
> >
> >
> > http://markmail.org/message/n4cdxwurlhf44q5x
> >
> > https://issues.apache.org/jira/browse/SPARK-19185
> >
> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Why are you setting consumer.cache.enabled to false?
> >>
> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
> >> > Hi,
> >> >
> >> > What would be the appropriate settings to run Spark with Kafka 10? My
> >> > job
> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
> >> > very
> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka
> 10
> >> > . I
> >> > see the following error sometimes . Please see the kafka parameters
> and
> >> > the
> >> > consumer strategy for creating the stream below. Any suggestions on
> how
> >> > to
> >> > run this with better performance would be of great help.
> >> >
> >> > java.lang.AssertionError: assertion failed: Failed to get records for
> >> > test
> >> > stream1 72 324027964 after polling for 12
> >> >
> >> > val kafkaParams = Map[String, Object](
> >> >   "bootstrap.servers" -> kafkaBrokers,
> >> >   "key.deserializer" -> classOf[StringDeserializer],
> >> >   "value.deserializer" -> classOf[StringDeserializer],
> >> >   "auto.offset.reset" -> "latest",
> >> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
> >> >   "session.timeout.ms" -> Integer.valueOf(6),
> >> >   "request.timeout.ms" -> Integer.valueOf(9),
> >> >   "enable.auto.commit" -> (false: java.lang.Boolean),
> >> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
> >> >   "group.id" -> "test1"
> >> > )
> >> >
> >> >   val hubbleStream = KafkaUtils.createDirectStream[String,
> String](
> >> > ssc,
> >> > LocationStrategies.PreferConsistent,
> >> > ConsumerStrategies.Subscribe[String, String](topicsSet,
> >> > kafkaParams)
> >> >   )
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-
> performance-while-running-Spark-Kafka-Direct-Streaming-
> with-Kafka-10-cluster-tp29108.html
> >> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >
> >
> >
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread swetha kasireddy
Because I saw some posts that say that consumer cache  enabled will have
concurrentModification exception with reduceByKeyAndWIndow. I see those
errors as well after running for sometime with cache being enabled. So, I
had to disable it. Please see the tickets below.  We have 96 partitions. So
if I enable cache, would teh following settings help to improve
performance?

"spark.streaming.kafka.consumer.cache.initialCapacity" -> Integer.*valueOf*
(12),
"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(15),

"spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),

http://markmail.org/message/n4cdxwurlhf44q5x

https://issues.apache.org/jira/browse/SPARK-19185


Also, I have a batch of 60 seconds. What do you suggest the following  to
be?

 session.timeout.ms, heartbeat.interval.ms

On Fri, Aug 25, 2017 at 5:04 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> Because I saw some posts that say that consumer cache  enabled will have
> concurrentModification exception with reduceByKeyAndWIndow. I see those
> errors as well after running for sometime with cache being enabled. So, I
> had to disable it. Please see the tickets below.  We have 96 partitions. So
> if I enable cache, would teh following settings help to improve
> performance?
>
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*
> (96),
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*
> (96),
>
> "spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),
>
> http://markmail.org/message/n4cdxwurlhf44q5x
>
> https://issues.apache.org/jira/browse/SPARK-19185
>
> On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Why are you setting consumer.cache.enabled to false?
>>
>> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
>> > Hi,
>> >
>> > What would be the appropriate settings to run Spark with Kafka 10? My
>> job
>> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> very
>> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka
>> 10 . I
>> > see the following error sometimes . Please see the kafka parameters and
>> the
>> > consumer strategy for creating the stream below. Any suggestions on how
>> to
>> > run this with better performance would be of great help.
>> >
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> test
>> > stream1 72 324027964 after polling for 12
>> >
>> > val kafkaParams = Map[String, Object](
>> >   "bootstrap.servers" -> kafkaBrokers,
>> >   "key.deserializer" -> classOf[StringDeserializer],
>> >   "value.deserializer" -> classOf[StringDeserializer],
>> >   "auto.offset.reset" -> "latest",
>> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>> >   "session.timeout.ms" -> Integer.valueOf(6),
>> >   "request.timeout.ms" -> Integer.valueOf(9),
>> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >   "group.id" -> "test1"
>> > )
>> >
>> >   val hubbleStream = KafkaUtils.createDirectStream[String, String](
>> > ssc,
>> > LocationStrategies.PreferConsistent,
>> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>> kafkaParams)
>> >   )
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Slower-performance-while-running-Spark
>> -Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread swetha kasireddy
Because I saw some posts that say that consumer cache  enabled will have
concurrentModification exception with reduceByKeyAndWIndow. I see those
errors as well after running for sometime with cache being enabled. So, I
had to disable it. Please see the tickets below.  We have 96 partitions. So
if I enable cache, would teh following settings help to improve
performance?

"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(96),
"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(96),

"spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),

http://markmail.org/message/n4cdxwurlhf44q5x

https://issues.apache.org/jira/browse/SPARK-19185

On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger  wrote:

> Why are you setting consumer.cache.enabled to false?
>
> On Fri, Aug 25, 2017 at 2:19 PM, SRK  wrote:
> > Hi,
> >
> > What would be the appropriate settings to run Spark with Kafka 10? My job
> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very
> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10
> . I
> > see the following error sometimes . Please see the kafka parameters and
> the
> > consumer strategy for creating the stream below. Any suggestions on how
> to
> > run this with better performance would be of great help.
> >
> > java.lang.AssertionError: assertion failed: Failed to get records for
> test
> > stream1 72 324027964 after polling for 12
> >
> > val kafkaParams = Map[String, Object](
> >   "bootstrap.servers" -> kafkaBrokers,
> >   "key.deserializer" -> classOf[StringDeserializer],
> >   "value.deserializer" -> classOf[StringDeserializer],
> >   "auto.offset.reset" -> "latest",
> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
> >   "session.timeout.ms" -> Integer.valueOf(6),
> >   "request.timeout.ms" -> Integer.valueOf(9),
> >   "enable.auto.commit" -> (false: java.lang.Boolean),
> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
> >   "group.id" -> "test1"
> > )
> >
> >   val hubbleStream = KafkaUtils.createDirectStream[String, String](
> > ssc,
> > LocationStrategies.PreferConsistent,
> > ConsumerStrategies.Subscribe[String, String](topicsSet,
> kafkaParams)
> >   )
> >
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Slower-performance-while-running-
> Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?

2017-08-21 Thread swetha kasireddy
Hi Cody,

I think the Assign is used if we want it to start from a specified offset.
What if we want it to start it from the latest offset with something like
returned by "auto.offset.reset" -> "latest",.


Thanks!

On Mon, Aug 21, 2017 at 9:06 AM, Cody Koeninger  wrote:

> Yes, you can start from specified offsets.  See ConsumerStrategy,
> specifically Assign
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-
> integration.html#your-own-data-store
>
> On Tue, Aug 15, 2017 at 1:18 PM, SRK  wrote:
> > Hi,
> >
> > How to force Spark Kafka Direct to start from the latest offset when the
> lag
> > is huge in kafka 10? It seems to be processing from the latest offset
> stored
> > for a group id. One way to do this is to change the group id. But it
> would
> > mean that each time that we need to process the job from the latest
> offset
> > we have to provide a new group id.
> >
> > Is there a way to force the job to run from the latest offset in case we
> > need to and still use the same group id?
> >
> > Thanks!
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-
> start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Fwd: Issues when trying to recover a textFileStream from checkpoint in Spark streaming

2017-08-11 Thread swetha kasireddy
Hi,

I am facing issues while trying to recover a textFileStream from checkpoint.
Basically it is trying to load the files from the begining  of the job start
whereas I am deleting the files after processing them. I have the following
configs set so was thinking that it should not look for files beyond 2
minutes when trying to recover from checkpoint. Any suggestions on this
would be of great help.

  sparkConf.set("spark.streaming.minRememberDuration","120s")
  sparkConf.set("spark.streaming.fileStream.minRememberDuration","120s")

Thanks,
Swetha




-- Forwarded message --
From: SRK <swethakasire...@gmail.com>
Date: Thu, Aug 10, 2017 at 5:04 PM
Subject: Issues when trying to recover a textFileStream from checkpoint in
Spark streaming
To: user@spark.apache.org


Hi,

I am facing issues while trying to recover a textFileStream from checkpoint.
Basically it is trying to load the files from the begining  of the job start
whereas I am deleting the files after processing them. I have the following
configs set so was thinking that it should not look for files beyond 2
minutes when trying to recover from checkpoint. Any suggestions on this
would be of great help.

  sparkConf.set("spark.streaming.minRememberDuration","120s")
  sparkConf.set("spark.streaming.fileStream.minRememberDuration","120s")

Thanks,
Swetha



--
View this message in context: http://apache-spark-user-list.
1001560.n3.nabble.com/Issues-when-trying-to-recover-a-textFileStream-from-
checkpoint-in-Spark-streaming-tp29052.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Does mapWithState need checkpointing to be specified in Spark Streaming?

2017-07-13 Thread swetha kasireddy
OK. Thanks TD. Does stateSnapshots()   bring the snapshot of the state of
all the keys managed by mapWithState or does  it just bring the state of
the keys in the current micro batch? Its kind of conflicting because the
following link says that it brings the state  only for the keys seen in the
current batch.
But, the code documentation says that it brings the state for all the keys.
Also stateSnapshots()  does it give the expired key state for the last time
if we set it?

wordStream.mapWithState(stateSpec).stateSnapshots()

https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html

On Thu, Jul 13, 2017 at 3:00 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Yes. It does.
>
> On that note, Spark 2.2 (released couple of days ago) adds
> mapGroupsWithState in Structured Streaming.  That is like mapWithState on
> steroids. Just saying. :)
>
> On Thu, Jul 13, 2017 at 1:01 PM, SRK <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> Do we need to specify checkpointing for mapWithState just like we do for
>> updateStateByKey?
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Does-mapWithState-need-checkpointing-
>> to-be-specified-in-Spark-Streaming-tp28858.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: How do I find the time taken by each step in a stage in a Spark Job

2017-07-09 Thread swetha kasireddy
Yes, the Spark UI has some information but, it's not that helpful to find
out which particular stage is taking time.

On Wed, Jun 28, 2017 at 12:51 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> You can find the information from the spark UI
>
> ---Original---
> *From:* "SRK"
> *Date:* 2017/6/28 02:36:37
> *To:* "user";
> *Subject:* How do I find the time taken by each step in a stage in a
> Spark Job
>
> Hi,
>
> How do I find the time taken by each step in a stage in spark job? Also,
> how
> do I find the bottleneck in each step and if a stage is skipped because of
> the RDDs being persisted in streaming?
>
> I am trying to identify which step in a job is taking time in my Streaming
> job.
>
> Thanks!
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-do-I-find-the-time-taken-by-each-
> step-in-a-stage-in-a-Spark-Job-tp28796.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-22 Thread swetha kasireddy
Hi TD,

I am still seeing this issue with any immuatble DataStructure. Any idea why
this happens? I use scala.collection.immutable.List[String])  and my reduce
and inverse reduce does the following.

visitorSet1 ++visitorSet2



visitorSet1.filterNot(visitorSet2.contains(_)



On Wed, Jun 7, 2017 at 8:43 AM, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> I changed the datastructure to scala.collection.immutable.Set and I still
> see the same issue. My key is a String.  I do the following in my reduce
> and invReduce.
>
> visitorSet1 ++visitorSet2.toTraversable
>
>
> visitorSet1 --visitorSet2.toTraversable
>
> On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <tathagata.das1...@gmail.com
> > wrote:
>
>> Yes, and in general any mutable data structure. You have to immutable
>> data structures whose hashcode and equals is consistent enough for being
>> put in a set.
>>
>> On Jun 6, 2017 4:50 PM, "swetha kasireddy" <swethakasire...@gmail.com>
>> wrote:
>>
>>> Are you suggesting against the usage of HashSet?
>>>
>>> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> This may be because of HashSet is a mutable data structure, and it
>>>> seems you are actually mutating it in "set1 ++set2". I suggest creating a
>>>> new HashMap in the function (and add both maps into it), rather than
>>>> mutating one of them.
>>>>
>>>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <swethakasire...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>>>>> Streaming app. I use reduce, invReduce and filterFunction as shown
>>>>> below.
>>>>> Any idea as to why I get the error?
>>>>>
>>>>>  java.lang.Exception: Neither previous window has value for key, nor
>>>>> new
>>>>> values found. Are you sure your key class hashes consistently?
>>>>>
>>>>>
>>>>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>> HashSet[String]))
>>>>> => (Long, HashSet[String])= {
>>>>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>>>>> ++set2 )
>>>>>
>>>>>   }
>>>>>
>>>>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>>> HashSet[String])) => (Long, HashSet[String])= {
>>>>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>>>>> set1.diff(set2))
>>>>>   }
>>>>>
>>>>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>>>>> (Boolean)= {
>>>>> case ((metricName:String, (timeStamp: Long, set:
>>>>> HashSet[String]))) =>
>>>>> set.size>0
>>>>>   }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-spark-user-list.
>>>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>>>>> ndow-in-Spark-Streaming-tp28748.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>


Re: Is Structured streaming ready for production usage

2017-06-08 Thread swetha kasireddy
OK. Can we use Spark Kafka Direct with  Structured Streaming?

On Thu, Jun 8, 2017 at 4:46 PM, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> OK. Can we use Spark Kafka Direct as part of Structured Streaming?
>
> On Thu, Jun 8, 2017 at 3:35 PM, Tathagata Das <tathagata.das1...@gmail.com
> > wrote:
>
>> YES. At Databricks, our customers have already been using Structured
>> Streaming and in the last month alone processed over 3 trillion records.
>>
>> https://databricks.com/blog/2017/06/06/simple-super-fast-str
>> eaming-engine-apache-spark.html
>>
>> On Thu, Jun 8, 2017 at 3:03 PM, SRK <swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Is structured streaming ready for production usage in Spark 2.2?
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Is-Structured-streaming-ready-for-prod
>>> uction-usage-tp28751.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Is Structured streaming ready for production usage

2017-06-08 Thread swetha kasireddy
OK. Can we use Spark Kafka Direct as part of Structured Streaming?

On Thu, Jun 8, 2017 at 3:35 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> YES. At Databricks, our customers have already been using Structured
> Streaming and in the last month alone processed over 3 trillion records.
>
> https://databricks.com/blog/2017/06/06/simple-super-fast-
> streaming-engine-apache-spark.html
>
> On Thu, Jun 8, 2017 at 3:03 PM, SRK <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> Is structured streaming ready for production usage in Spark 2.2?
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Is-Structured-streaming-ready-for-
>> production-usage-tp28751.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-07 Thread swetha kasireddy
I changed the datastructure to scala.collection.immutable.Set and I still
see the same issue. My key is a String.  I do the following in my reduce
and invReduce.

visitorSet1 ++visitorSet2.toTraversable


visitorSet1 --visitorSet2.toTraversable

On Tue, Jun 6, 2017 at 8:22 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Yes, and in general any mutable data structure. You have to immutable data
> structures whose hashcode and equals is consistent enough for being put in
> a set.
>
> On Jun 6, 2017 4:50 PM, "swetha kasireddy" <swethakasire...@gmail.com>
> wrote:
>
>> Are you suggesting against the usage of HashSet?
>>
>> On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> This may be because of HashSet is a mutable data structure, and it seems
>>> you are actually mutating it in "set1 ++set2". I suggest creating a new
>>> HashMap in the function (and add both maps into it), rather than mutating
>>> one of them.
>>>
>>> On Tue, Jun 6, 2017 at 11:30 AM, SRK <swethakasire...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>>>> Streaming app. I use reduce, invReduce and filterFunction as shown
>>>> below.
>>>> Any idea as to why I get the error?
>>>>
>>>>  java.lang.Exception: Neither previous window has value for key, nor new
>>>> values found. Are you sure your key class hashes consistently?
>>>>
>>>>
>>>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>> HashSet[String]))
>>>> => (Long, HashSet[String])= {
>>>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1
>>>> ++set2 )
>>>>
>>>>   }
>>>>
>>>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>>>> HashSet[String])) => (Long, HashSet[String])= {
>>>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>>>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>>>> set1.diff(set2))
>>>>   }
>>>>
>>>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>>>> (Boolean)= {
>>>> case ((metricName:String, (timeStamp: Long, set: HashSet[String])))
>>>> =>
>>>> set.size>0
>>>>   }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>>>> ndow-in-Spark-Streaming-tp28748.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>


Re: Exception which using ReduceByKeyAndWindow in Spark Streaming.

2017-06-06 Thread swetha kasireddy
Are you suggesting against the usage of HashSet?

On Tue, Jun 6, 2017 at 3:36 PM, Tathagata Das 
wrote:

> This may be because of HashSet is a mutable data structure, and it seems
> you are actually mutating it in "set1 ++set2". I suggest creating a new
> HashMap in the function (and add both maps into it), rather than mutating
> one of them.
>
> On Tue, Jun 6, 2017 at 11:30 AM, SRK  wrote:
>
>> Hi,
>>
>> I see the following error when I use ReduceByKeyAndWindow in my Spark
>> Streaming app. I use reduce, invReduce and filterFunction as shown below.
>> Any idea as to why I get the error?
>>
>>  java.lang.Exception: Neither previous window has value for key, nor new
>> values found. Are you sure your key class hashes consistently?
>>
>>
>>   def reduceWithHashSet: ((Long, HashSet[String]), (Long,
>> HashSet[String]))
>> => (Long, HashSet[String])= {
>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1 ++set2
>> )
>>
>>   }
>>
>>   def invReduceWithHashSet: ((Long, HashSet[String]), (Long,
>> HashSet[String])) => (Long, HashSet[String])= {
>> case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long,
>> set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2),
>> set1.diff(set2))
>>   }
>>
>>   def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) =>
>> (Boolean)= {
>> case ((metricName:String, (timeStamp: Long, set: HashSet[String]))) =>
>> set.size>0
>>   }
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Exception-which-using-ReduceByKeyAndWi
>> ndow-in-Spark-Streaming-tp28748.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: How to set hive configs in Spark 2.1?

2017-02-27 Thread swetha kasireddy
Even the hive configurations like the following would work with this?

sqlContext.setConf("hive.default.fileformat", "Orc")
sqlContext.setConf("hive.exec.orc.memory.pool", "1.0")
sqlContext.setConf("hive.optimize.sort.dynamic.partition", "true")
sqlContext.setConf("hive.exec.reducers.max", "2000")

On Mon, Feb 27, 2017 at 9:26 AM, neil90  wrote:

> All you need to do is -
>
> spark.conf.set("spark.sql.shuffle.partitions", 2000)
> spark.conf.set("spark.sql.orc.filterPushdown", True)
> ...etc
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-set-hive-configs-in-Spark-2-1-
> tp28429p28431.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to spin up Kafka using docker and use for Spark Streaming Integration tests

2016-07-06 Thread swetha kasireddy
Can this docker image be used to spin up kafka cluster in a CI/CD pipeline
like Jenkins to run the integration tests? Or it can be done only in the
local machine that has docker installed? I assume that the box where the
CI/CD pipeline runs should have docker installed correct?

On Mon, Jul 4, 2016 at 5:20 AM, Lars Albertsson <la...@mapflat.com> wrote:

> I created such a setup for a client a few months ago. It is pretty
> straightforward, but it can take some work to get all the wires
> connected.
>
> I suggest that you start with the spotify/kafka
> (https://github.com/spotify/docker-kafka) Docker image, since it
> includes a bundled zookeeper. The alternative would be to spin up a
> separate Zookeeper Docker container and connect them, but for testing
> purposes, it would make the setup more complex.
>
> You'll need to inform Kafka about the external address it exposes by
> setting ADVERTISED_HOST to the output of "docker-machine ip" (on Mac)
> or the address printed by "ip addr show docker0" (Linux). I also
> suggest setting
> AUTO_CREATE_TOPICS to true.
>
> You can choose to run your Spark Streaming application under test
> (SUT) and your test harness also in Docker containers, or directly on
> your host.
>
> In the former case, it is easiest to set up a Docker Compose file
> linking the harness and SUT to Kafka. This variant provides better
> isolation, and might integrate better if you have existing similar
> test frameworks.
>
> If you want to run the harness and SUT outside Docker, I suggest that
> you build your harness with a standard test framework, e.g. scalatest
> or JUnit, and run both harness and SUT in the same JVM. In this case,
> you put code to bring up the Kafka Docker container in test framework
> setup methods. This test strategy integrates better with IDEs and
> build tools (mvn/sbt/gradle), since they will run (and debug) your
> tests without any special integration. I therefore prefer this
> strategy.
>
>
> What is the output of your application? If it is messages on a
> different Kafka topic, the test harness can merely subscribe and
> verify output. If you emit output to a database, you'll need another
> Docker container, integrated with Docker Compose. If you are emitting
> database entries, your test oracle will need to frequently poll the
> database for the expected records, with a timeout in order not to hang
> on failing tests.
>
> I hope this is comprehensible. Let me know if you have followup questions.
>
> Regards,
>
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
> Calendar: https://goo.gl/6FBtlS
>
>
>
> On Thu, Jun 30, 2016 at 8:19 PM, SRK <swethakasire...@gmail.com> wrote:
> > Hi,
> >
> > I need to do integration tests using Spark Streaming. My idea is to spin
> up
> > kafka using docker locally and use it to feed the stream to my Streaming
> > Job. Any suggestions on how to do this would be of great help.
> >
> > Thanks,
> > Swetha
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-spin-up-Kafka-using-docker-and-use-for-Spark-Streaming-Integration-tests-tp27252.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: How to spin up Kafka using docker and use for Spark Streaming Integration tests

2016-07-06 Thread swetha kasireddy
The application output is that it inserts data to cassandra at the end of
every batch.

On Mon, Jul 4, 2016 at 5:20 AM, Lars Albertsson <la...@mapflat.com> wrote:

> I created such a setup for a client a few months ago. It is pretty
> straightforward, but it can take some work to get all the wires
> connected.
>
> I suggest that you start with the spotify/kafka
> (https://github.com/spotify/docker-kafka) Docker image, since it
> includes a bundled zookeeper. The alternative would be to spin up a
> separate Zookeeper Docker container and connect them, but for testing
> purposes, it would make the setup more complex.
>
> You'll need to inform Kafka about the external address it exposes by
> setting ADVERTISED_HOST to the output of "docker-machine ip" (on Mac)
> or the address printed by "ip addr show docker0" (Linux). I also
> suggest setting
> AUTO_CREATE_TOPICS to true.
>
> You can choose to run your Spark Streaming application under test
> (SUT) and your test harness also in Docker containers, or directly on
> your host.
>
> In the former case, it is easiest to set up a Docker Compose file
> linking the harness and SUT to Kafka. This variant provides better
> isolation, and might integrate better if you have existing similar
> test frameworks.
>
> If you want to run the harness and SUT outside Docker, I suggest that
> you build your harness with a standard test framework, e.g. scalatest
> or JUnit, and run both harness and SUT in the same JVM. In this case,
> you put code to bring up the Kafka Docker container in test framework
> setup methods. This test strategy integrates better with IDEs and
> build tools (mvn/sbt/gradle), since they will run (and debug) your
> tests without any special integration. I therefore prefer this
> strategy.
>
>
> What is the output of your application? If it is messages on a
> different Kafka topic, the test harness can merely subscribe and
> verify output. If you emit output to a database, you'll need another
> Docker container, integrated with Docker Compose. If you are emitting
> database entries, your test oracle will need to frequently poll the
> database for the expected records, with a timeout in order not to hang
> on failing tests.
>
> I hope this is comprehensible. Let me know if you have followup questions.
>
> Regards,
>
>
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
> Calendar: https://goo.gl/6FBtlS
>
>
>
> On Thu, Jun 30, 2016 at 8:19 PM, SRK <swethakasire...@gmail.com> wrote:
> > Hi,
> >
> > I need to do integration tests using Spark Streaming. My idea is to spin
> up
> > kafka using docker locally and use it to feed the stream to my Streaming
> > Job. Any suggestions on how to do this would be of great help.
> >
> > Thanks,
> > Swetha
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-spin-up-Kafka-using-docker-and-use-for-Spark-Streaming-Integration-tests-tp27252.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: Kryo ClassCastException during Serialization/deserialization in Spark Streaming

2016-06-23 Thread swetha kasireddy
sampleMap is populated from inside a method that is getting called from
updateStateByKey

On Thu, Jun 23, 2016 at 1:13 PM, Ted Yu  wrote:

> Can you illustrate how sampleMap is populated ?
>
> Thanks
>
> On Thu, Jun 23, 2016 at 12:34 PM, SRK  wrote:
>
>> Hi,
>>
>> I keep getting the following error in my Spark Streaming every now and
>> then
>> after the  job runs for say around 10 hours. I have those 2 classes
>> registered in kryo as shown below.  sampleMap is a field in SampleSession
>> as shown below. Any suggestion as to how to avoid this would be of great
>> help!!
>>
>> public class SampleSession implements Serializable, Cloneable{
>>private Map sampleMap;
>> }
>>
>>  sparkConf.registerKryoClasses(Array( classOf[SampleSession],
>> classOf[Sample]))
>>
>>
>>
>> com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException:
>> com.test.Sample cannot be cast to java.lang.String
>> Serialization trace:
>> sampleMap (com.test.SampleSession)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>> com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:96)
>> at
>> com.twitter.chill.Tuple6Serializer.write(TupleSerializers.scala:93)
>> at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
>> at
>> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>> at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>>
>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
>> at
>>
>> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
>> at
>>
>> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
>> at
>>
>> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
>> at
>> org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:132)
>> at
>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:793)
>> at
>> org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
>> at
>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:175)
>> at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.ClassCastException: com.test.Sample cannot be cast to
>> java.lang.String
>> at
>>
>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)
>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:82)
>> at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> ... 37 more
>>
>>
>>
>> --
>> View 

Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-15 Thread swetha kasireddy
Hi Mich,

No I have not tried that. My requirement is to insert that from an hourly
Spark Batch job. How is it different by trying to insert with Hive CLI or
beeline?

Thanks,
Swetha



On Tue, Jun 14, 2016 at 10:44 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> Hi Swetha,
>
> Have you actually tried doing this in Hive using Hive CLI or beeline?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 14 June 2016 at 18:43, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> In all probability there is no user database created in Hive
>>
>> Create a database yourself
>>
>> sql("create if not exists database test")
>>
>> It would be helpful if you grasp some concept of Hive databases etc?
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 14 June 2016 at 15:40, Sree Eedupuganti <s...@inndata.in> wrote:
>>
>>> Hi Spark users, i am new to spark. I am trying to connect hive using
>>> SparkJavaContext. Unable to connect to the database. By executing the below
>>> code i can see only "default" database. Can anyone help me out. What i need
>>> is a sample program for Querying Hive results using SparkJavaContext. Need
>>> to pass any values like this.
>>>
>>> userDF.registerTempTable("userRecordsTemp")
>>>
>>> sqlContext.sql("SET hive.default.fileformat=Orc  ")
>>> sqlContext.sql("set hive.enforce.bucketing = true; ")
>>> sqlContext.sql("set hive.enforce.sorting = true; ")
>>>
>>>  public static void  main(String[] args ) throws Exception {
>>>   SparkConf sparkConf = new
>>> SparkConf().setAppName("SparkSQL").setMaster("local");
>>>   SparkContext  ctx=new SparkContext(sparkConf);
>>>   HiveContext  hiveql=new
>>> org.apache.spark.sql.hive.HiveContext(ctx);
>>>   DataFrame df=hiveql.sql("show databases");
>>>   df.show();
>>>   }
>>>
>>> Any suggestions pleaseThanks.
>>>
>>
>>
>


Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-14 Thread swetha kasireddy
Hi Bijay,

This approach might not work for me as I have to do partial
inserts/overwrites in a given table and data_frame.write.partitionBy will
overwrite the entire table.

Thanks,
Swetha

On Mon, Jun 13, 2016 at 9:25 PM, Bijay Pathak <bijay.pat...@cloudwick.com>
wrote:

> Hi Swetha,
>
> One option is to use Hive with the above issues fixed which is Hive 2.0 or
> Cloudera CDH Hive 1.2 which has above issue resolved. One thing to remember
> is it's not the Hive you have installed but the Hive Spark is using which
> in Spark 1.6 is Hive version 1.2 as of now.
>
> The workaround I did for this issue was to write dataframe directly using
> dataframe write method and to create the Hive Table on top of that, doing
> which my processing time was down  from 4+ hrs to just under 1 hr.
>
>
>
> data_frame.write.partitionBy('idPartitioner','dtPartitoner').orc("path/to/final/location")
>
> And ORC format is supported with HiveContext only.
>
> Thanks,
> Bijay
>
>
> On Mon, Jun 13, 2016 at 11:41 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Mich,
>>
>> Following is  a sample code snippet:
>>
>>
>> *val *userDF =
>> userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId", 
>> "userRecord").persist()
>> System.*out*.println(" userRecsDF.partitions.size"+
>> userRecsDF.partitions.size)
>>
>> userDF.registerTempTable("userRecordsTemp")
>>
>> sqlContext.sql("SET hive.default.fileformat=Orc  ")
>> sqlContext.sql("set hive.enforce.bucketing = true; ")
>> sqlContext.sql("set hive.enforce.sorting = true; ")
>> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
>> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
>> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' ")
>> sqlContext.sql(
>>   """ from userRecordsTemp ps   insert overwrite table users
>> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
>> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
>> """.stripMargin)
>>
>>
>> On Mon, Jun 13, 2016 at 10:57 AM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi Bijay,
>>>
>>> If I am hitting this issue,
>>> https://issues.apache.org/jira/browse/HIVE-11940. What needs to be
>>> done? Incrementing to higher version of hive is the only solution?
>>>
>>> Thanks!
>>>
>>> On Mon, Jun 13, 2016 at 10:47 AM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Following is  a sample code snippet:
>>>>
>>>>
>>>> *val *userDF = userRecsDF.toDF("idPartitioner", "dtPartitioner",
>>>> "userId", "userRecord").persist()
>>>> System.*out*.println(" userRecsDF.partitions.size"+
>>>> userRecsDF.partitions.size)
>>>>
>>>> userDF.registerTempTable("userRecordsTemp")
>>>>
>>>> sqlContext.sql("SET hive.default.fileformat=Orc  ")
>>>> sqlContext.sql("set hive.enforce.bucketing = true; ")
>>>> sqlContext.sql("set hive.enforce.sorting = true; ")
>>>> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
>>>> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
>>>> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' "
>>>> )
>>>> sqlContext.sql(
>>>>   """ from userRecordsTemp ps   insert overwrite table users
>>>> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
>>>> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
>>>> """.stripMargin)
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jun 10, 2016 at 12:10 AM, Bijay Pathak <
>>>> bijay.pat...@cloudwick.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Looks like you are hitting this:
>>>>> https://issues.apache.org/jira/browse/HIVE-11940.
>>>>>
>>>>> Thanks,
>>>>> Bijay
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh <
>>>>> mich.talebz

Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-13 Thread swetha kasireddy
Hi Mich,

Following is  a sample code snippet:


*val *userDF =
userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId",
"userRecord").persist()
System.*out*.println(" userRecsDF.partitions.size"+
userRecsDF.partitions.size)

userDF.registerTempTable("userRecordsTemp")

sqlContext.sql("SET hive.default.fileformat=Orc  ")
sqlContext.sql("set hive.enforce.bucketing = true; ")
sqlContext.sql("set hive.enforce.sorting = true; ")
sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId STRING,
userRecord STRING) PARTITIONED BY (idPartitioner STRING, dtPartitioner
STRING)   stored as ORC LOCATION '/user/userId/userRecords' ")
sqlContext.sql(
  """ from userRecordsTemp ps   insert overwrite table users
partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
""".stripMargin)


On Mon, Jun 13, 2016 at 10:57 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> Hi Bijay,
>
> If I am hitting this issue,
> https://issues.apache.org/jira/browse/HIVE-11940. What needs to be done?
> Incrementing to higher version of hive is the only solution?
>
> Thanks!
>
> On Mon, Jun 13, 2016 at 10:47 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> Following is  a sample code snippet:
>>
>>
>> *val *userDF = userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId",
>> "userRecord").persist()
>> System.*out*.println(" userRecsDF.partitions.size"+
>> userRecsDF.partitions.size)
>>
>> userDF.registerTempTable("userRecordsTemp")
>>
>> sqlContext.sql("SET hive.default.fileformat=Orc  ")
>> sqlContext.sql("set hive.enforce.bucketing = true; ")
>> sqlContext.sql("set hive.enforce.sorting = true; ")
>> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
>> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
>> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' "
>> )
>> sqlContext.sql(
>>   """ from userRecordsTemp ps   insert overwrite table users
>> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
>> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
>> """.stripMargin)
>>
>>
>>
>>
>> On Fri, Jun 10, 2016 at 12:10 AM, Bijay Pathak <
>> bijay.pat...@cloudwick.com> wrote:
>>
>>> Hello,
>>>
>>> Looks like you are hitting this:
>>> https://issues.apache.org/jira/browse/HIVE-11940.
>>>
>>> Thanks,
>>> Bijay
>>>
>>>
>>>
>>> On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> cam you provide a code snippet of how you are populating the target
>>>> table from temp table.
>>>>
>>>>
>>>> HTH
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 9 June 2016 at 23:43, swetha kasireddy <swethakasire...@gmail.com>
>>>> wrote:
>>>>
>>>>> No, I am reading the data from hdfs, transforming it , registering the
>>>>> data in a temp table using registerTempTable and then doing insert
>>>>> overwrite using Spark SQl' hiveContext.
>>>>>
>>>>> On Thu, Jun 9, 2016 at 3:40 PM, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> how are you doing the insert? from an existing table?
>>>>>>
>>>>>> Dr Mich Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> LinkedIn * 
>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://talebzadehmich.wordpress.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 9 June 2016 at 21:16, Stephen Boesch <java...@gmail.com> wrote:
>>>>>>
>>>>>>> How many workers (/cpu cores) are assigned to this job?
>>>>>>>
>>>>>>> 2016-06-09 13:01 GMT-07:00 SRK <swethakasire...@gmail.com>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> How to insert data into 2000 partitions(directories) of
>>>>>>>> ORC/parquet  at a
>>>>>>>> time using Spark SQL? It seems to be not performant when I try to
>>>>>>>> insert
>>>>>>>> 2000 directories of Parquet/ORC using Spark SQL. Did anyone face
>>>>>>>> this issue?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context:
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-into-2000-partitions-directories-of-ORC-parquet-at-a-time-using-Spark-SQL-tp27132.html
>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>> Nabble.com.
>>>>>>>>
>>>>>>>>
>>>>>>>> -
>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-13 Thread swetha kasireddy
Hi Bijay,

If I am hitting this issue,
https://issues.apache.org/jira/browse/HIVE-11940. What needs to be done?
Incrementing to higher version of hive is the only solution?

Thanks!

On Mon, Jun 13, 2016 at 10:47 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> Hi,
>
> Following is  a sample code snippet:
>
>
> *val *userDF = userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId",
> "userRecord").persist()
> System.*out*.println(" userRecsDF.partitions.size"+
> userRecsDF.partitions.size)
>
> userDF.registerTempTable("userRecordsTemp")
>
> sqlContext.sql("SET hive.default.fileformat=Orc  ")
> sqlContext.sql("set hive.enforce.bucketing = true; ")
> sqlContext.sql("set hive.enforce.sorting = true; ")
> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' "
> )
> sqlContext.sql(
>   """ from userRecordsTemp ps   insert overwrite table users
> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
> """.stripMargin)
>
>
>
>
> On Fri, Jun 10, 2016 at 12:10 AM, Bijay Pathak <bijay.pat...@cloudwick.com
> > wrote:
>
>> Hello,
>>
>> Looks like you are hitting this:
>> https://issues.apache.org/jira/browse/HIVE-11940.
>>
>> Thanks,
>> Bijay
>>
>>
>>
>> On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> cam you provide a code snippet of how you are populating the target
>>> table from temp table.
>>>
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 9 June 2016 at 23:43, swetha kasireddy <swethakasire...@gmail.com>
>>> wrote:
>>>
>>>> No, I am reading the data from hdfs, transforming it , registering the
>>>> data in a temp table using registerTempTable and then doing insert
>>>> overwrite using Spark SQl' hiveContext.
>>>>
>>>> On Thu, Jun 9, 2016 at 3:40 PM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> how are you doing the insert? from an existing table?
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 9 June 2016 at 21:16, Stephen Boesch <java...@gmail.com> wrote:
>>>>>
>>>>>> How many workers (/cpu cores) are assigned to this job?
>>>>>>
>>>>>> 2016-06-09 13:01 GMT-07:00 SRK <swethakasire...@gmail.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> How to insert data into 2000 partitions(directories) of ORC/parquet
>>>>>>> at a
>>>>>>> time using Spark SQL? It seems to be not performant when I try to
>>>>>>> insert
>>>>>>> 2000 directories of Parquet/ORC using Spark SQL. Did anyone face
>>>>>>> this issue?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-into-2000-partitions-directories-of-ORC-parquet-at-a-time-using-Spark-SQL-tp27132.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-13 Thread swetha kasireddy
Hi,

Following is  a sample code snippet:


*val *userDF = userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId",
"userRecord").persist()
System.*out*.println(" userRecsDF.partitions.size"+
userRecsDF.partitions.size)

userDF.registerTempTable("userRecordsTemp")

sqlContext.sql("SET hive.default.fileformat=Orc  ")
sqlContext.sql("set hive.enforce.bucketing = true; ")
sqlContext.sql("set hive.enforce.sorting = true; ")
sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId STRING,
userRecord STRING) PARTITIONED BY (idPartitioner STRING, dtPartitioner
STRING)   stored as ORC LOCATION '/user/userId/userRecords' ")
sqlContext.sql(
  """ from userRecordsTemp ps   insert overwrite table users
partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
""".stripMargin)




On Fri, Jun 10, 2016 at 12:10 AM, Bijay Pathak <bijay.pat...@cloudwick.com>
wrote:

> Hello,
>
> Looks like you are hitting this:
> https://issues.apache.org/jira/browse/HIVE-11940.
>
> Thanks,
> Bijay
>
>
>
> On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
>> cam you provide a code snippet of how you are populating the target table
>> from temp table.
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 9 June 2016 at 23:43, swetha kasireddy <swethakasire...@gmail.com>
>> wrote:
>>
>>> No, I am reading the data from hdfs, transforming it , registering the
>>> data in a temp table using registerTempTable and then doing insert
>>> overwrite using Spark SQl' hiveContext.
>>>
>>> On Thu, Jun 9, 2016 at 3:40 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> how are you doing the insert? from an existing table?
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 9 June 2016 at 21:16, Stephen Boesch <java...@gmail.com> wrote:
>>>>
>>>>> How many workers (/cpu cores) are assigned to this job?
>>>>>
>>>>> 2016-06-09 13:01 GMT-07:00 SRK <swethakasire...@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> How to insert data into 2000 partitions(directories) of ORC/parquet
>>>>>> at a
>>>>>> time using Spark SQL? It seems to be not performant when I try to
>>>>>> insert
>>>>>> 2000 directories of Parquet/ORC using Spark SQL. Did anyone face this
>>>>>> issue?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-into-2000-partitions-directories-of-ORC-parquet-at-a-time-using-Spark-SQL-tp27132.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-09 Thread swetha kasireddy
No, I am reading the data from hdfs, transforming it , registering the data
in a temp table using registerTempTable and then doing insert overwrite
using Spark SQl' hiveContext.

On Thu, Jun 9, 2016 at 3:40 PM, Mich Talebzadeh 
wrote:

> how are you doing the insert? from an existing table?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 9 June 2016 at 21:16, Stephen Boesch  wrote:
>
>> How many workers (/cpu cores) are assigned to this job?
>>
>> 2016-06-09 13:01 GMT-07:00 SRK :
>>
>>> Hi,
>>>
>>> How to insert data into 2000 partitions(directories) of ORC/parquet  at a
>>> time using Spark SQL? It seems to be not performant when I try to insert
>>> 2000 directories of Parquet/ORC using Spark SQL. Did anyone face this
>>> issue?
>>>
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-into-2000-partitions-directories-of-ORC-parquet-at-a-time-using-Spark-SQL-tp27132.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-09 Thread swetha kasireddy
400 cores are assigned to this job.

On Thu, Jun 9, 2016 at 1:16 PM, Stephen Boesch  wrote:

> How many workers (/cpu cores) are assigned to this job?
>
> 2016-06-09 13:01 GMT-07:00 SRK :
>
>> Hi,
>>
>> How to insert data into 2000 partitions(directories) of ORC/parquet  at a
>> time using Spark SQL? It seems to be not performant when I try to insert
>> 2000 directories of Parquet/ORC using Spark SQL. Did anyone face this
>> issue?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-into-2000-partitions-directories-of-ORC-parquet-at-a-time-using-Spark-SQL-tp27132.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to insert data for 100 partitions at a time using Spark SQL

2016-05-22 Thread swetha kasireddy
I am doing the 1. currently using the following and it takes a lot of time.
Whats the advantage of doing 2 and how to do it?

sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING,
record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING)
stored as ORC LOCATION '/user/users' ")
  sqlContext.sql("  orc.compress= SNAPPY")
  sqlContext.sql(
""" from recordsTemp ps   insert overwrite table users
partition(datePartition , idPartition )  select ps.id, ps.record ,
ps.datePartition, ps.idPartition  """.stripMargin)

On Sun, May 22, 2016 at 12:47 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> two alternatives for this ETL or ELT
>
>
>1. There is only one external ORC table and you do insert overwrite
>into that external table through Spark sql
>2. or
>3. 14k files loaded into staging area/read directory and then insert
>overwrite into an ORC table and th
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 May 2016 at 20:38, swetha kasireddy <swethakasire...@gmail.com>
> wrote:
>
>> Around 14000 partitions need to be loaded every hour. Yes, I tested this
>> and its taking a lot of time to load. A partition would look something like
>> the following which is further partitioned by userId with all the
>> userRecords for that date inside it.
>>
>> 5 2016-05-20 16:03 /user/user/userRecords/dtPartitioner=2012-09-12
>>
>> On Sun, May 22, 2016 at 12:30 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> by partition do you mean 14000 files loaded in each batch session (say
>>> daily)?.
>>>
>>> Have you actually tested this?
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 22 May 2016 at 20:24, swetha kasireddy <swethakasire...@gmail.com>
>>> wrote:
>>>
>>>> The data is not very big. Say 1MB-10 MB at the max per partition. What
>>>> is the best way to insert this 14k partitions with decent performance?
>>>>
>>>> On Sun, May 22, 2016 at 12:18 PM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> the acid question is how many rows are you going to insert in a batch
>>>>> session? btw if this is purely an sql operation then you can do all that 
>>>>> in
>>>>> hive running on spark engine. It will be very fast as well.
>>>>>
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 22 May 2016 at 20:14, Jörn Franke <jornfra...@gmail.com> wrote:
>>>>>
>>>>>> 14000 partitions seem to be way too many to be performant (except for
>>>>>> large data sets). How much data does one partition contain?
>>>>>>
>>>>>> > On 22 May 2016, at 09:34, SRK <swethakasire...@gmail.com> wrote:
>>>>>> >
>>>>>> > Hi,
>>>>>> >
>>>>>> > In my Spark SQL query to insert data, I have around 14,000
>>>>>> partitions of
>>>>>> > data which seems to be causing memory issues. How can I insert the
>>>>>> data for
>>>>>> > 100 partitions at a time to avoid any memory issues?
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-for-100-partitions-at-a-time-using-Spark-SQL-tp26997.html
>>>>>> > Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>> >
>>>>>> >
>>>>>> -
>>>>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>>>>> >
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to insert data for 100 partitions at a time using Spark SQL

2016-05-22 Thread swetha kasireddy
Around 14000 partitions need to be loaded every hour. Yes, I tested this
and its taking a lot of time to load. A partition would look something like
the following which is further partitioned by userId with all the
userRecords for that date inside it.

5 2016-05-20 16:03 /user/user/userRecords/dtPartitioner=2012-09-12

On Sun, May 22, 2016 at 12:30 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> by partition do you mean 14000 files loaded in each batch session (say
> daily)?.
>
> Have you actually tested this?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 May 2016 at 20:24, swetha kasireddy <swethakasire...@gmail.com>
> wrote:
>
>> The data is not very big. Say 1MB-10 MB at the max per partition. What is
>> the best way to insert this 14k partitions with decent performance?
>>
>> On Sun, May 22, 2016 at 12:18 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> the acid question is how many rows are you going to insert in a batch
>>> session? btw if this is purely an sql operation then you can do all that in
>>> hive running on spark engine. It will be very fast as well.
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 22 May 2016 at 20:14, Jörn Franke <jornfra...@gmail.com> wrote:
>>>
>>>> 14000 partitions seem to be way too many to be performant (except for
>>>> large data sets). How much data does one partition contain?
>>>>
>>>> > On 22 May 2016, at 09:34, SRK <swethakasire...@gmail.com> wrote:
>>>> >
>>>> > Hi,
>>>> >
>>>> > In my Spark SQL query to insert data, I have around 14,000 partitions
>>>> of
>>>> > data which seems to be causing memory issues. How can I insert the
>>>> data for
>>>> > 100 partitions at a time to avoid any memory issues?
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-for-100-partitions-at-a-time-using-Spark-SQL-tp26997.html
>>>> > Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com.
>>>> >
>>>> > -
>>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>>> >
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: How to insert data for 100 partitions at a time using Spark SQL

2016-05-22 Thread swetha kasireddy
The data is not very big. Say 1MB-10 MB at the max per partition. What is
the best way to insert this 14k partitions with decent performance?

On Sun, May 22, 2016 at 12:18 PM, Mich Talebzadeh  wrote:

> the acid question is how many rows are you going to insert in a batch
> session? btw if this is purely an sql operation then you can do all that in
> hive running on spark engine. It will be very fast as well.
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 May 2016 at 20:14, Jörn Franke  wrote:
>
>> 14000 partitions seem to be way too many to be performant (except for
>> large data sets). How much data does one partition contain?
>>
>> > On 22 May 2016, at 09:34, SRK  wrote:
>> >
>> > Hi,
>> >
>> > In my Spark SQL query to insert data, I have around 14,000 partitions of
>> > data which seems to be causing memory issues. How can I insert the data
>> for
>> > 100 partitions at a time to avoid any memory issues?
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-for-100-partitions-at-a-time-using-Spark-SQL-tp26997.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to insert data for 100 partitions at a time using Spark SQL

2016-05-22 Thread swetha kasireddy
So, if I put 1000 records at a time and if the next 1000 records have some
records that has same  partition as the previous records then the data will
be overwritten. How can I prevent overwriting valid data in this case?
Could you post the example that you are talking about?

What I am doing is in the final insert into the ORC table, I
insert/overwrite the data. So, I need to have  a way to insert all the data
related to one partition at a time so that it is not overwritten when I
insert the next set of records.

On Sun, May 22, 2016 at 11:51 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> ok is the staging table used as staging only.
>
> you can create a staging *directory^ where you put your data there (you
> can put 100s of files there) and do an insert/select that will take data
> from 100 files into your main ORC table.
>
> I have an example of 100's of CSV files insert/select from a staging
> external table into an ORC table.
>
> My point is you are more likely interested in doing analysis on ORC table
> (read internal) rather than using staging table.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 May 2016 at 19:43, swetha kasireddy <swethakasire...@gmail.com>
> wrote:
>
>> But, how do I take 100 partitions at a time from staging table?
>>
>> On Sun, May 22, 2016 at 11:26 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> ok so you still keep data as ORC in Hive for further analysis
>>>
>>> what I have in mind is to have an external table as staging table and do
>>> insert into an orc internal table which is bucketed and partitioned.
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 22 May 2016 at 19:11, swetha kasireddy <swethakasire...@gmail.com>
>>> wrote:
>>>
>>>> I am looking at ORC. I insert the data using the following query.
>>>>
>>>> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS records (id
>>>> STRING,
>>>> record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING)
>>>> stored as ORC LOCATION '/user/users' ")
>>>>   sqlContext.sql("  orc.compress= SNAPPY")
>>>>   sqlContext.sql(
>>>> """ from recordsTemp ps   insert overwrite table users
>>>> partition(datePartition , idPartition )  select ps.id, ps.record ,
>>>> ps.datePartition, ps.idPartition  """.stripMargin)
>>>>
>>>> On Sun, May 22, 2016 at 12:37 AM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> where is your base table and what format is it Parquet, ORC etc)
>>>>>
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 22 May 2016 at 08:34, SRK <swethakasire...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> In my Spark SQL query to insert data, I have around 14,000 partitions
>>>>>> of
>>>>>> data which seems to be causing memory issues. How can I insert the
>>>>>> data for
>>>>>> 100 partitions at a time to avoid any memory issues?
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-for-100-partitions-at-a-time-using-Spark-SQL-tp26997.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to insert data for 100 partitions at a time using Spark SQL

2016-05-22 Thread swetha kasireddy
But, how do I take 100 partitions at a time from staging table?

On Sun, May 22, 2016 at 11:26 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> ok so you still keep data as ORC in Hive for further analysis
>
> what I have in mind is to have an external table as staging table and do
> insert into an orc internal table which is bucketed and partitioned.
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 May 2016 at 19:11, swetha kasireddy <swethakasire...@gmail.com>
> wrote:
>
>> I am looking at ORC. I insert the data using the following query.
>>
>> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING,
>> record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING)
>> stored as ORC LOCATION '/user/users' ")
>>   sqlContext.sql("  orc.compress= SNAPPY")
>>   sqlContext.sql(
>> """ from recordsTemp ps   insert overwrite table users
>> partition(datePartition , idPartition )  select ps.id, ps.record ,
>> ps.datePartition, ps.idPartition  """.stripMargin)
>>
>> On Sun, May 22, 2016 at 12:37 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> where is your base table and what format is it Parquet, ORC etc)
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 22 May 2016 at 08:34, SRK <swethakasire...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> In my Spark SQL query to insert data, I have around 14,000 partitions of
>>>> data which seems to be causing memory issues. How can I insert the data
>>>> for
>>>> 100 partitions at a time to avoid any memory issues?
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-for-100-partitions-at-a-time-using-Spark-SQL-tp26997.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: How to insert data for 100 partitions at a time using Spark SQL

2016-05-22 Thread swetha kasireddy
I am looking at ORC. I insert the data using the following query.

sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING,
record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING)
stored as ORC LOCATION '/user/users' ")
  sqlContext.sql("  orc.compress= SNAPPY")
  sqlContext.sql(
""" from recordsTemp ps   insert overwrite table users
partition(datePartition , idPartition )  select ps.id, ps.record ,
ps.datePartition, ps.idPartition  """.stripMargin)

On Sun, May 22, 2016 at 12:37 AM, Mich Talebzadeh  wrote:

> where is your base table and what format is it Parquet, ORC etc)
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 May 2016 at 08:34, SRK  wrote:
>
>> Hi,
>>
>> In my Spark SQL query to insert data, I have around 14,000 partitions of
>> data which seems to be causing memory issues. How can I insert the data
>> for
>> 100 partitions at a time to avoid any memory issues?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-for-100-partitions-at-a-time-using-Spark-SQL-tp26997.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Memory issues when trying to insert data in the form of ORC using Spark SQL

2016-05-20 Thread swetha kasireddy
Also, the Spark SQL insert seems to take only two tasks per stage. That
might be the reason why it does not have sufficient memory. Is there a way
to increase the number of tasks when doing the sql insert?

Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
ReadShuffle Write
12 (kill)
save at
SaveUsersToHdfs.scala:255
+details

2016/05/20 16:32:47 5.0 min
0/2
21.4 MB

On Fri, May 20, 2016 at 3:43 PM, SRK  wrote:

>
> Hi,
>
> I see some memory issues when trying to insert the data in the form of ORC
> using Spark SQL. Please find the query and exception below. Any idea as to
> why this is happening?
>
> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING,
> record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING)
> stored as ORC LOCATION '/user/users' ")
>   sqlContext.sql("  orc.compress= SNAPPY")
>   sqlContext.sql(
> """ from recordsTemp ps   insert overwrite table users
> partition(datePartition , idPartition )  select ps.id, ps.record ,
> ps.datePartition, ps.idPartition  """.stripMargin)
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 13.0org.apache.hadoop.hive.ql.metadata.HiveException:
> parquet.hadoop.MemoryManager$1: New Memory allocation 1048575 bytes is
> smaller than the minimum allocation size of 1048576 bytes.
> at
>
> org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)
> at
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.org
> $apache$spark$sql$hive$SparkHiveDynamicPartitionWriterContainer$$newWriter$1(hiveWriterContainers.scala:240)
> at
>
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249)
> at
>
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249)
> at
> scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
> at
> scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
> at
>
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.getLocalFileWriter(hiveWriterContainers.scala:249)
> at
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:112)
> at
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org
> $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
> at
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
> at
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: parquet.hadoop.MemoryManager$1: New Memory allocation 1048575
> bytes is smaller than the minimum allocation size of 1048576 bytes.
> at
> parquet.hadoop.MemoryManager.updateAllocation(MemoryManager.java:125)
> at parquet.hadoop.MemoryManager.addWriter(MemoryManager.java:82)
> at
> parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:104)
> at
>
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303)
> at
>
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:267)
> at
>
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.(ParquetRecordWriterWrapper.java:65)
> at
>
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125)
> at
>
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114)
> at
>
> 

Re: Unit testing framework for Spark Jobs?

2016-05-18 Thread swetha kasireddy
Hi Lars,

Do you have any examples for the methods that you described for Spark batch
and Streaming?

Thanks!

On Wed, Mar 30, 2016 at 2:41 AM, Lars Albertsson  wrote:

> Thanks!
>
> It is on my backlog to write a couple of blog posts on the topic, and
> eventually some example code, but I am currently busy with clients.
>
> Thanks for the pointer to Eventually - I was unaware. Fast exit on
> exception would be a useful addition, indeed.
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
>
> On Mon, Mar 28, 2016 at 2:00 PM, Steve Loughran 
> wrote:
> > this is a good summary -Have you thought of publishing it at the end of
> a URL for others to refer to
> >
> >> On 18 Mar 2016, at 07:05, Lars Albertsson  wrote:
> >>
> >> I would recommend against writing unit tests for Spark programs, and
> >> instead focus on integration tests of jobs or pipelines of several
> >> jobs. You can still use a unit test framework to execute them. Perhaps
> >> this is what you meant.
> >>
> >> You can use any of the popular unit test frameworks to drive your
> >> tests, e.g. JUnit, Scalatest, Specs2. I prefer Scalatest, since it
> >> gives you choice of TDD vs BDD, and it is also well integrated with
> >> IntelliJ.
> >>
> >> I would also recommend against using testing frameworks tied to a
> >> processing technology, such as Spark Testing Base. Although it does
> >> seem well crafted, and makes it easy to get started with testing,
> >> there are drawbacks:
> >>
> >> 1. I/O routines are not tested. Bundled test frameworks typically do
> >> not materialise datasets on storage, but pass them directly in memory.
> >> (I have not verified this for Spark Testing Base, but it looks so.)
> >> I/O routines are therefore not exercised, and they often hide bugs,
> >> e.g. related to serialisation.
> >>
> >> 2. You create a strong coupling between processing technology and your
> >> tests. If you decide to change processing technology (which can happen
> >> soon in this fast paced world...), you need to rewrite your tests.
> >> Therefore, during a migration process, the tests cannot detect bugs
> >> introduced in migration, and help you migrate fast.
> >>
> >> I recommend that you instead materialise input datasets on local disk,
> >> run your Spark job, which writes output datasets to local disk, read
> >> output from disk, and verify the results. You can still use Spark
> >> routines to read and write input and output datasets. A Spark context
> >> is expensive to create, so for speed, I would recommend reusing the
> >> Spark context between input generation, running the job, and reading
> >> output.
> >>
> >> This is easy to set up, so you don't need a dedicated framework for
> >> it. Just put your common boilerplate in a shared test trait or base
> >> class.
> >>
> >> In the future, when you want to replace your Spark job with something
> >> shinier, you can still use the old tests, and only replace the part
> >> that runs your job, giving you some protection from regression bugs.
> >>
> >>
> >> Testing Spark Streaming applications is a different beast, and you can
> >> probably not reuse much from your batch testing.
> >>
> >> For testing streaming applications, I recommend that you run your
> >> application inside a unit test framework, e.g, Scalatest, and have the
> >> test setup create a fixture that includes your input and output
> >> components. For example, if your streaming application consumes from
> >> Kafka and updates tables in Cassandra, spin up single node instances
> >> of Kafka and Cassandra on your local machine, and connect your
> >> application to them. Then feed input to a Kafka topic, and wait for
> >> the result to appear in Cassandra.
> >>
> >> With this setup, your application still runs in Scalatest, the tests
> >> run without custom setup in maven/sbt/gradle, and you can easily run
> >> and debug inside IntelliJ.
> >>
> >> Docker is suitable for spinning up external components. If you use
> >> Kafka, the Docker image spotify/kafka is useful, since it bundles
> >> Zookeeper.
> >>
> >> When waiting for output to appear, don't sleep for a long time and
> >> then check, since it will slow down your tests. Instead enter a loop
> >> where you poll for the results and sleep for a few milliseconds in
> >> between, with a long timeout (~30s) before the test fails with a
> >> timeout.
> >
> > org.scalatest.concurrent.Eventually is your friend there
> >
> > eventually(stdTimeout, stdInterval) {
> > listRestAPIApplications(connector, webUI, true) should
> contain(expectedAppId)
> > }
> >
> > It has good exponential backoff, for fast initial success without using
> too much CPU later, and is simple to use
> >
> > If it has weaknesses in my tests, they are
> >
> > 1. it will retry on all exceptions, rather than assertions. If there's a
> bug in the test code then it manifests as a timeout. ( I think I could play
> with 

Re: What is the default value of rebalance.backoff.ms in Spark Kafka Direct?

2016-04-29 Thread swetha kasireddy
OK. Thanks Cody!

On Fri, Apr 29, 2016 at 12:41 PM, Cody Koeninger <c...@koeninger.org> wrote:

> If worker to broker communication breaks down, the worker will sleep
> for refresh.leader.backoff.ms before throwing an error, at which point
> normal spark task retry (spark.task.maxFailures) comes into play.
>
> If driver to broker communication breaks down, the driver will sleep
> for refresh.leader.backoff.ms before retrying the attempt to get
> offsets, up to spark.streaming.kafka.maxRetries number of times.
>
> The actual leader rebalancing process is entirely up to Kafka, which
> is why I'm saying if you're losing leaders, you should look at Kafka.
>
> On Fri, Apr 29, 2016 at 11:21 AM, swetha kasireddy
> <swethakasire...@gmail.com> wrote:
> > OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
> > default for rebalancing and they say that refresh.leader.backoff.ms of
> 200
> > to refresh leader is very aggressive and suggested us to increase it to
> > 2000. Even after increasing to 2500 I still get Leader Lost Errors.
> >
> > Is  refresh.leader.backoff.ms the right setting in the app for it to
> wait
> > till the leader election and rebalance is done from the Kafka side
> assuming
> > that Kafka has  rebalance.backoff.ms of 2000  ?
> >
> > Also, does Spark Kafka Direct try to restart the app when the leader is
> lost
> > or it will just wait till  refresh.leader.backoff.ms   and then retry
> again
> > depending on the number of retries?
> >
> > On Fri, Apr 29, 2016 at 8:14 AM, swetha kasireddy
> > <swethakasire...@gmail.com> wrote:
> >>
> >> OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
> >> default for rebalancing and they say that refresh.leader.backoff.ms of
> 200
> >> to refresh leader is very aggressive and suggested us to increase it to
> >> 2000. Even after increasing to 2500 I still get Leader Lost Errors.
> >>
> >> Is  refresh.leader.backoff.ms the right setting in the app for it to
> wait
> >> till the leader election and rebalance is done from the Kafka side
> assuming
> >> that Kafka has  rebalance.backoff.ms of 2000  ?
> >>
> >> On Wed, Apr 27, 2016 at 11:05 AM, Cody Koeninger <c...@koeninger.org>
> >> wrote:
> >>>
> >>> Seems like it'd be better to look into the Kafka side of things to
> >>> determine why you're losing leaders frequently, as opposed to trying
> >>> to put a bandaid on it.
> >>>
> >>> On Wed, Apr 27, 2016 at 11:49 AM, SRK <swethakasire...@gmail.com>
> wrote:
> >>> > Hi,
> >>> >
> >>> > We seem to be getting a lot of LeaderLostExceptions and our source
> >>> > Stream is
> >>> > working with a default value of rebalance.backoff.ms which is 2000.
> I
> >>> > was
> >>> > thinking to increase this value to 5000. Any suggestions on  this?
> >>> >
> >>> > Thanks!
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > View this message in context:
> >>> >
> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-default-value-of-rebalance-backoff-ms-in-Spark-Kafka-Direct-tp26840.html
> >>> > Sent from the Apache Spark User List mailing list archive at
> >>> > Nabble.com.
> >>> >
> >>> > -
> >>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>> > For additional commands, e-mail: user-h...@spark.apache.org
> >>> >
> >>
> >>
> >
>


Re: What is the default value of rebalance.backoff.ms in Spark Kafka Direct?

2016-04-29 Thread swetha kasireddy
OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
default for rebalancing and they say that refresh.leader.backoff.ms of 200
to refresh leader is very aggressive and suggested us to increase it to
2000. Even after increasing to 2500 I still get Leader Lost Errors.

Is  refresh.leader.backoff.ms the right setting in the app for it to wait
till the leader election and rebalance is done from the Kafka side assuming
that Kafka has  rebalance.backoff.ms of 2000  ?

Also, does Spark Kafka Direct try to restart the app when the leader is
lost or it will just wait till  refresh.leader.backoff.ms   and then retry
again depending on the number of retries?

On Fri, Apr 29, 2016 at 8:14 AM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
> default for rebalancing and they say that refresh.leader.backoff.ms of
> 200 to refresh leader is very aggressive and suggested us to increase it to
> 2000. Even after increasing to 2500 I still get Leader Lost Errors.
>
> Is  refresh.leader.backoff.ms the right setting in the app for it to wait
> till the leader election and rebalance is done from the Kafka side assuming
> that Kafka has  rebalance.backoff.ms of 2000  ?
>
> On Wed, Apr 27, 2016 at 11:05 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Seems like it'd be better to look into the Kafka side of things to
>> determine why you're losing leaders frequently, as opposed to trying
>> to put a bandaid on it.
>>
>> On Wed, Apr 27, 2016 at 11:49 AM, SRK <swethakasire...@gmail.com> wrote:
>> > Hi,
>> >
>> > We seem to be getting a lot of LeaderLostExceptions and our source
>> Stream is
>> > working with a default value of rebalance.backoff.ms which is 2000. I
>> was
>> > thinking to increase this value to 5000. Any suggestions on  this?
>> >
>> > Thanks!
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-default-value-of-rebalance-backoff-ms-in-Spark-Kafka-Direct-tp26840.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>


Re: What is the default value of rebalance.backoff.ms in Spark Kafka Direct?

2016-04-29 Thread swetha kasireddy
OK. So the Kafka side they use rebalance.backoff.ms of 2000 which is a
default for rebalancing and they say that refresh.leader.backoff.ms of 200
to refresh leader is very aggressive and suggested us to increase it to
2000. Even after increasing to 2500 I still get Leader Lost Errors.

Is  refresh.leader.backoff.ms the right setting in the app for it to wait
till the leader election and rebalance is done from the Kafka side assuming
that Kafka has  rebalance.backoff.ms of 2000  ?

On Wed, Apr 27, 2016 at 11:05 AM, Cody Koeninger  wrote:

> Seems like it'd be better to look into the Kafka side of things to
> determine why you're losing leaders frequently, as opposed to trying
> to put a bandaid on it.
>
> On Wed, Apr 27, 2016 at 11:49 AM, SRK  wrote:
> > Hi,
> >
> > We seem to be getting a lot of LeaderLostExceptions and our source
> Stream is
> > working with a default value of rebalance.backoff.ms which is 2000. I
> was
> > thinking to increase this value to 5000. Any suggestions on  this?
> >
> > Thanks!
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-default-value-of-rebalance-backoff-ms-in-Spark-Kafka-Direct-tp26840.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: How to add an accumulator for a Set in Spark

2016-03-19 Thread swetha kasireddy
OK. I did take a look at them. So once I have an accumulater for a HashSet,
how can I check if a particular key is already present in the HashSet
accumulator? I don't see any .contains method there. My requirement is that
I need to keep accumulating the keys in the HashSet across all the tasks in
various nodes and use it to do a check if the key is already present in the
HashSet.

On Tue, Mar 15, 2016 at 9:56 PM, pppsunil  wrote:

> Have you looked at using Accumulable interface,  Take a look at Spark
> documentation at
> http://spark.apache.org/docs/latest/programming-guide.html#accumulators it
> gives example of how to use vector type for accumalator, which might be
> very
> close to what you need
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-an-accumulator-for-a-Set-in-Spark-tp26510p26514.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to control the number of parquet files getting created under a partition ?

2016-03-02 Thread swetha kasireddy
Thanks. I tried this yesterday and it seems to be working.

On Wed, Mar 2, 2016 at 1:49 AM, James Hammerton  wrote:

> Hi,
>
> Based on the behaviour I've seen using parquet, the number of partitions
> in the DataFrame will determine the number of files in each parquet
> partition.
>
> I.e. when you use "PARTITION BY" you're actually partitioning twice, once
> via the partitions spark has created internally and then again with the
> partitions you specify in the "PARTITION BY" clause.
>
> So if you have 10 partitions in your DataFrame, and save that as a parquet
> file or table partitioned on a column with 3 values, you'll get 30
> partitions, 10 per parquet partition.
>
> You can reduce the number of partitions in the DataFrame by using
> coalesce() before saving the data.
>
> Regards,
>
> James
>
>
> On 1 March 2016 at 21:01, SRK  wrote:
>
>> Hi,
>>
>> How can I control the number of parquet files getting created under a
>> partition? I have my sqlContext queries to create a table and insert the
>> records as follows. It seems to create around 250 parquet files under each
>> partition though I was expecting that to create around 2 or 3 files. Due
>> to
>> the large number of files, it takes a lot of time to scan the records. Any
>> suggestions as to how to control the number of parquet files under each
>> partition would be of great help.
>>
>>  sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS testUserDts
>> (userId STRING, savedDate STRING) PARTITIONED BY (partitioner STRING)
>> stored as PARQUET LOCATION '/user/testId/testUserDts' ")
>>
>>   sqlContext.sql(
>> """from testUserDtsTemp ps   insert overwrite table testUserDts
>> partition(partitioner)  select ps.userId, ps.savedDate ,  ps.partitioner
>> """.stripMargin)
>>
>>
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-the-number-of-parquet-files-getting-created-under-a-partition-tp26374.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to join multiple tables and use subqueries in Spark SQL using sqlContext?

2016-02-23 Thread swetha kasireddy
It seems to be failing when I do something like following in both
sqlContext and hiveContext


sqlContext.sql("SELECT ssd.savedDate from saveSessionDatesRecs ssd
where ssd.partitioner in (SELECT sr1.partitioner  from
sparkSessionRecords1 sr1))")


On Tue, Feb 23, 2016 at 5:57 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> These tables are stored in hdfs as parquet. Can sqlContext be applied for
> the subQueries?
>
> On Tue, Feb 23, 2016 at 5:31 PM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>> Assuming these are all in Hive, you can either use spark-sql or
>> spark-shell.
>>
>> HiveContext has richer settings compared to SparkContext
>>
>> Have a look at this example of joins among three Hive tables:
>>
>> // sc is an existing SparkContext.
>> val sqlContext  = new org.apache.spark.sql.hive.HiveContext(sc)
>> var sqltext : String = ""
>> sqltext = "use oraclehadoop"
>> sqlContext.sql(sqltext)
>> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
>> HH:mm:ss.ss') AS StartTime").show()
>> println("\n Running the query \n")
>>
>> sqltext = """
>>
>> SELECT c.country_name AS nation,
>>s.year,
>>s.month,
>>sum (amount_sold) AS salesamount
>> FROM  countries c
>> INNER JOIN customers cs
>> ON c.country_id = cs.country_id
>> INNER JOIN sales s
>> ON cs.cust_id = s.cust_id
>> GROUP BY country_name, s.year, s.month
>> ORDER BY country_name, s.year, s.month
>> """
>>
>> sqlContext.sql(sqltext).collect.foreach(println)
>>
>> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
>> HH:mm:ss.ss') AS EndTime").show()
>> sys.exit()
>>
>>
>>
>> HTH
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Cloud Technology 
>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>> stated. It is the responsibility of the recipient to ensure that this email 
>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>> subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>> On 24/02/2016 01:01, SRK wrote:
>>
>> Hi,
>>
>> How do I join multiple tables and use subqueries in Spark SQL using
>> sqlContext? Can I do this using sqlContext or do I have to use HiveContext
>> for the same?
>>
>> Thanks!
>>
>>
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-multiple-tables-and-use-subqueries-in-Spark-SQL-using-sqlContext-tp26315.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>>
>>
>>
>
>


Re: How to join multiple tables and use subqueries in Spark SQL using sqlContext?

2016-02-23 Thread swetha kasireddy
These tables are stored in hdfs as parquet. Can sqlContext be applied for
the subQueries?

On Tue, Feb 23, 2016 at 5:31 PM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

> Assuming these are all in Hive, you can either use spark-sql or
> spark-shell.
>
> HiveContext has richer settings compared to SparkContext
>
> Have a look at this example of joins among three Hive tables:
>
> // sc is an existing SparkContext.
> val sqlContext  = new org.apache.spark.sql.hive.HiveContext(sc)
> var sqltext : String = ""
> sqltext = "use oraclehadoop"
> sqlContext.sql(sqltext)
> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') AS StartTime").show()
> println("\n Running the query \n")
>
> sqltext = """
>
> SELECT c.country_name AS nation,
>s.year,
>s.month,
>sum (amount_sold) AS salesamount
> FROM  countries c
> INNER JOIN customers cs
> ON c.country_id = cs.country_id
> INNER JOIN sales s
> ON cs.cust_id = s.cust_id
> GROUP BY country_name, s.year, s.month
> ORDER BY country_name, s.year, s.month
> """
>
> sqlContext.sql(sqltext).collect.foreach(println)
>
> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') AS EndTime").show()
> sys.exit()
>
>
>
> HTH
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>
>
>
> On 24/02/2016 01:01, SRK wrote:
>
> Hi,
>
> How do I join multiple tables and use subqueries in Spark SQL using
> sqlContext? Can I do this using sqlContext or do I have to use HiveContext
> for the same?
>
> Thanks!
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-multiple-tables-and-use-subqueries-in-Spark-SQL-using-sqlContext-tp26315.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>
>


Re: How to use a custom partitioner in a dataframe in Spark

2016-02-17 Thread swetha kasireddy
So suppose I have a bunch of userIds and I need to save them as parquet in
database. I also need to load them back and need to be able to do a join
on userId. My idea is to partition by userId hashcode first and then on
userId. So that I don't have to deal with any performance issues because of
a number of small files and also to be able to scan faster.


Something like ...df.write.format("parquet").partitionBy( "userIdHash"
, "userId").mode(SaveMode.Append).save("userRecords");

On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> So suppose I have a bunch of userIds and I need to save them as parquet in
> database. I also need to load them back and need to be able to do a join
> on userId. My idea is to partition by userId hashcode first and then on
> userId.
>
>
>
> On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> Can you describe what you are trying to accomplish?  What would the
>> custom partitioner be?
>>
>> On Tue, Feb 16, 2016 at 1:21 PM, SRK <swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> How do I use a custom partitioner when I do a saveAsTable in a dataframe.
>>>
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: How to use a custom partitioner in a dataframe in Spark

2016-02-17 Thread swetha kasireddy
So suppose I have a bunch of userIds and I need to save them as parquet in
database. I also need to load them back and need to be able to do a join
on userId. My idea is to partition by userId hashcode first and then on
userId.



On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Can you describe what you are trying to accomplish?  What would the custom
> partitioner be?
>
> On Tue, Feb 16, 2016 at 1:21 PM, SRK <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> How do I use a custom partitioner when I do a saveAsTable in a dataframe.
>>
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to join an RDD with a hive table?

2016-02-16 Thread swetha kasireddy
How to use a customPartttioner hashed by userId inside saveAsTable using a
dataframe?

On Mon, Feb 15, 2016 at 11:24 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> How about saving the dataframe as a table partitioned by userId? My User
> records have userId, number of sessions, visit count etc as the columns and
> it should be partitioned by userId. I will need to join the userTable saved
> in the database as follows with an incoming session RDD. The session RDD
> would have a sessionId and  a sessionRecord which has the userId. So,
>  saving the user  data as a table using dataframes partitioned by userId
> and then joining it with session RDDs, needs to be done.  How can I join a
> dataframe saved in hdfs with an incoming RDD so that all the records are
> not read and only the records for which the join conditions are met are
> read?
>
> df.write.partitionBy('userId').saveAsTable(...)
>
>
> On Mon, Feb 15, 2016 at 10:09 AM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>>
>>
>> It depends on how many columns you need from tables for your queries and
>> potential number of rows.
>>
>> From my experience I don't believe that registering a table as temporary
>> means it is going to cache whole 1 billion rows into memory. That does not
>> make sense (I stand corrected). Only a fraction of rows and columns will be
>> needed.
>>
>> It will be interesting to know how Catalyst is handling this. I suspect
>> it behaves much like any data cache in a relational database by having some
>> form of MRU-LRU chain where rows are read into memory from the blocks,
>> processed and discarded to make room for new ones. If the memory is not big
>> enough the operation is spilled to disk.
>>
>> I just did a test on three tables in Hive with Spark 15.2 using Data
>> Frames and tempTables
>>
>> The FACT table had 1 billion rows as follows:
>>
>>
>> ++--+
>> | CREATE TABLE
>> `sales_staging`(  |
>> |   `prod_id`
>> bigint,|
>> |   `cust_id`
>> bigint,|
>> |   `time_id`
>> timestamp, |
>> |   `channel_id`
>> bigint, |
>> |   `promo_id`
>> bigint,   |
>> |   `quantity_sold`
>> decimal(10,0),   |
>> |   `amount_sold`
>> decimal(10,0)) |
>> | ROW FORMAT
>> SERDE   |
>> |
>> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' |
>> | STORED AS
>> INPUTFORMAT  |
>> |
>> 'org.apache.hadoop.mapred.TextInputFormat'   |
>> |
>> OUTPUTFORMAT   |
>> |
>> 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
>> |
>> LOCATION   |
>> |
>> 'hdfs://rhes564:9000/user/hive/warehouse/oraclehadoop.db/sales_staging'  |
>> | TBLPROPERTIES
>> (|
>> |
>> 'COLUMN_STATS_ACCURATE'='true',  |
>> |
>> 'last_modified_by'='hduser', |
>> |
>> 'last_modified_time'='1451305601',   |
>> |
>> 'numFiles'='4',  |
>> |   'numRows'='10',
>> |
>> |
>> 'rawDataSize'='46661545000', |
>> |
>> 'totalSize'='47661545000',   |
>> |
>> 'transient_lastDdlTime'='1451767601')|
>>
>>
>>
>> The other dimension tables were tiny. It took 13 minutes to get the first
>> 10 rows back but only requiring few columns of interest. So I don't think
>> it was loading 1 billion rows into memory from sales_staging table
>>
>> Started at
>>
>> [15/02/2016 17:47:28.28]
>>
>> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
>> ti

Re: How to join an RDD with a hive table?

2016-02-15 Thread swetha kasireddy
= t_c.CHANNEL_ID
>
> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ) rs
>
> LIMIT 10
>
>
>
> [1998-01,Direct Sales,1823005210]
>
> [1998-01,Internet,248172522]
>
> [1998-01,Partners,474646900]
>
> [1998-02,Direct Sales,1819659036]
>
> [1998-02,Internet,298586496]
>
> [1998-02,Partners,534103258]
>
> [1998-03,Direct Sales,1405805622]
>
> [1998-03,Internet,229163168]
>
> [1998-03,Partners,352277328]
>
> [1998-03,Tele Sales,59700082]
>
>  Finished at
>
> [15/02/2016 18:00:50.50]
>
>
>
> On 15/02/2016 17:27, swetha kasireddy wrote:
>
> OK. would it only query for the records that I want in hive as per filter
> or just load the entire table? My user table will have millions of records
> and I do not want to cause OOM errors by loading the entire table in memory.
>
> On Mon, Feb 15, 2016 at 12:51 AM, Mich Talebzadeh <m...@peridale.co.uk>
> wrote:
>
>> Also worthwhile using temporary tables for the joint query.
>>
>>
>>
>> I can join a Hive table with any other JDBC accessed table from any other
>> databases with DF and temporary tables
>>
>>
>>
>> //
>>
>> //Get the FACT table from Hive
>>
>> //
>>
>> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
>> oraclehadoop.sales")
>>
>>
>>
>> //
>>
>> //Get the Dimension table from Oracle via JDBC
>>
>> //
>>
>> val c = HiveContext.load("jdbc",
>>
>> Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb",
>>
>> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC
>> FROM sh.channels)",
>>
>> "user" -> "sh",
>>
>> "password" -> "xxx"))
>>
>>
>>
>>
>>
>> s.registerTempTable("t_s")
>>
>> c.registerTempTable("t_c")
>>
>>
>>
>> And do the join
>>
>>
>>
>> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>>
>> FROM
>>
>> (
>>
>> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS
>> SalesChannel, SUM(t_s.AMOUNT_SOLD) AS TotalSales
>>
>> FROM t_s, t_t, t_c
>>
>> WHERE t_s.TIME_ID = t_t.TIME_ID
>>
>> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>>
>> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>>
>> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>>
>> ) rs
>>
>> LIMIT 1000
>>
>> """
>>
>> HiveContext.sql(sqltext).collect.foreach(println)
>>
>>
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> NOTE: The information in this email is proprietary and confidential. This
>> message is for the designated recipient only, if you are not the intended
>> recipient, you should destroy it immediately. Any information in this
>> message shall not be understood as given or endorsed by Peridale Technology
>> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
>> the responsibility of the recipient to ensure that this email is virus
>> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
>> employees accept any responsibility.
>>
>>
>>
>>
>>
>> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
>> *Sent:* 15 February 2016 08:44
>> *To:* SRK <swethakasire...@gmail.com>
>> *Cc:* user <user@spark.apache.org>
>> *Subject:* Re: How to join an RDD with a hive table?
>>
>>
>>
>> Have you tried creating a DataFrame from the RDD and join with DataFrame
>> which corresponds to the hive table ?
>>
>>
>>
>> On Sun, Feb 14, 2016 at 9:53 PM, SRK <swethakasire...@gmail.com> wrote:
>>
>> Hi,
>>
>> How to join an RDD with a hive table and retrieve only the records that I
>> am
>> interested. Suppose, I have an RDD that has 1000 records and there is a
>> Hive
>> table with 100,000 records, I should be able to join the RDD with the hive
>> table  by an Id and I should be able to load only those 1000 records from
>> Hive table so that are no memory issues. Also, I was planning on storing
>> the
>> data in hive in the form of parquet files. Any help on this is greatly
>> appreciated.
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-an-RDD-with-a-hive-table-tp26225.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>
>
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>
>
>


Re: How to join an RDD with a hive table?

2016-02-15 Thread swetha kasireddy
OK. would it only query for the records that I want in hive as per filter
or just load the entire table? My user table will have millions of records
and I do not want to cause OOM errors by loading the entire table in memory.

On Mon, Feb 15, 2016 at 12:51 AM, Mich Talebzadeh 
wrote:

> Also worthwhile using temporary tables for the joint query.
>
>
>
> I can join a Hive table with any other JDBC accessed table from any other
> databases with DF and temporary tables
>
>
>
> //
>
> //Get the FACT table from Hive
>
> //
>
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> oraclehadoop.sales")
>
>
>
> //
>
> //Get the Dimension table from Oracle via JDBC
>
> //
>
> val c = HiveContext.load("jdbc",
>
> Map("url" -> "jdbc:oracle:thin:@rhes564:1521:mydb",
>
> "dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC FROM
> sh.channels)",
>
> "user" -> "sh",
>
> "password" -> "xxx"))
>
>
>
>
>
> s.registerTempTable("t_s")
>
> c.registerTempTable("t_c")
>
>
>
> And do the join
>
>
>
> SELECT rs.Month, rs.SalesChannel, round(TotalSales,2)
>
> FROM
>
> (
>
> SELECT t_t.CALENDAR_MONTH_DESC AS Month, t_c.CHANNEL_DESC AS SalesChannel,
> SUM(t_s.AMOUNT_SOLD) AS TotalSales
>
> FROM t_s, t_t, t_c
>
> WHERE t_s.TIME_ID = t_t.TIME_ID
>
> AND   t_s.CHANNEL_ID = t_c.CHANNEL_ID
>
> GROUP BY t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ORDER by t_t.CALENDAR_MONTH_DESC, t_c.CHANNEL_DESC
>
> ) rs
>
> LIMIT 1000
>
> """
>
> HiveContext.sql(sqltext).collect.foreach(println)
>
>
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* 15 February 2016 08:44
> *To:* SRK 
> *Cc:* user 
> *Subject:* Re: How to join an RDD with a hive table?
>
>
>
> Have you tried creating a DataFrame from the RDD and join with DataFrame
> which corresponds to the hive table ?
>
>
>
> On Sun, Feb 14, 2016 at 9:53 PM, SRK  wrote:
>
> Hi,
>
> How to join an RDD with a hive table and retrieve only the records that I
> am
> interested. Suppose, I have an RDD that has 1000 records and there is a
> Hive
> table with 100,000 records, I should be able to join the RDD with the hive
> table  by an Id and I should be able to load only those 1000 records from
> Hive table so that are no memory issues. Also, I was planning on storing
> the
> data in hive in the form of parquet files. Any help on this is greatly
> appreciated.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-an-RDD-with-a-hive-table-tp26225.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Driver not able to restart the job automatically after the application of Streaming with Kafka Direct went down

2016-02-05 Thread swetha kasireddy
he application UI was up. But, in the prod job, the
> driver did not restart the application. Any idea as to why the prod driver
> not able to restart the job with everything being same in qa/prod including
> the --supervise option?
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Driver-not-able-to-restart-the-job-automatically-after-the-application-of-Streaming-with-Kafka-Direcn-tp26155.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Help needed in deleting a message posted in Spark User List

2016-02-05 Thread swetha kasireddy
Hi,

I want to edit/delete a message posted in Spark User List. How do I do that?

Thanks!


Re: How to load partial data from HDFS using Spark SQL

2016-01-02 Thread swetha kasireddy
OK. What should the table be? Suppose I have a bunch of parquet files, do I
just specify the directory as the table?

On Fri, Jan 1, 2016 at 11:32 PM, UMESH CHAUDHARY 
wrote:

> Ok, so whats wrong in using :
>
> var df=HiveContext.sql("Select * from table where id = ")
> //filtered data frame
> df.count
>
> On Sat, Jan 2, 2016 at 11:56 AM, SRK  wrote:
>
>> Hi,
>>
>> How to load partial data from hdfs using Spark SQL? Suppose I want to load
>> data based on a filter like
>>
>> "Select * from table where id = " using Spark SQL with DataFrames,
>> how can that be done? The
>>
>> idea here is that I do not want to load the whole data into memory when I
>> use the SQL and I just want to
>>
>> load the data based on the filter.
>>
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-load-partial-data-from-HDFS-using-Spark-SQL-tp25855.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark batch getting hung up

2015-12-20 Thread swetha kasireddy
I see this happens when there is a deadlock situation. The RDD test1 has a
Couchbase call and it seems to be having threads hanging there. Eventhough
all the connections are closed I see the threads related to Couchbase
causing the job to hang for sometime before it gets cleared up.

Would the driver not wait till all the stuff related to test1 is completed
before calling test2 as test2 is dependent on test1?

val test1 =RDD1.mapPartitions.()

val test2 = test1.mapPartititions()

On Sat, Dec 19, 2015 at 12:24 AM, Jeff Zhang <zjf...@gmail.com> wrote:

> First you need to know where the hang happens (driver or executor),
> checking log would be helpful
>
> On Sat, Dec 19, 2015 at 12:25 AM, SRK <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> My Spark Batch job seems to hung up sometimes for a long time before it
>> starts the next stage/exits. Basically it happens when it has
>> mapPartition/foreachPartition in a stage. Any idea as to why this is
>> happening?
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-batch-getting-hung-up-tp25735.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Spark metrics for ganglia

2015-12-08 Thread swetha kasireddy
Hi,

How to verify whether the GangliaSink directory got created?

Thanks,
Swetha

On Mon, Dec 15, 2014 at 11:29 AM, danilopds <danilob...@gmail.com> wrote:

> Thanks tsingfu,
>
> I used this configuration based in your post: (with ganglia unicast mode)
> # Enable GangliaSink for all instances
> *.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink
> *.sink.ganglia.host=10.0.0.7
> *.sink.ganglia.port=8649
> *.sink.ganglia.period=15
> *.sink.ganglia.unit=seconds
> *.sink.ganglia.ttl=1
> *.sink.ganglia.mode=unicast
>
> Then,
> I have the following error now.
> ERROR metrics.MetricsSystem: Sink class
> org.apache.spark.metrics.sink.GangliaSink  cannot be instantialized
> java.lang.ClassNotFoundException: org.apache.spark.metrics.sink.GangliaSink
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-metrics-for-ganglia-tp14335p20690.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to build Spark with Ganglia to enable monitoring using Ganglia

2015-12-07 Thread swetha kasireddy
OK. I think the following can be used.

mvn -Pspark-ganglia-lgpl -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0
-DskipTests clean package


On Mon, Dec 7, 2015 at 10:13 AM, SRK <swethakasire...@gmail.com> wrote:

> Hi,
>
> How to do a maven build to enable monitoring using Ganglia? What is the
> command for the same?
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-build-Spark-with-Ganglia-to-enable-monitoring-using-Ganglia-tp25625.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Effective ways monitor and identify that a Streaming job has been failing for the last 5 minutes

2015-12-06 Thread swetha kasireddy
Any documentation/sample code on how to use Ganglia with Spark?

On Sat, Dec 5, 2015 at 10:29 PM, manasdebashiskar 
wrote:

> spark has capability to report to ganglia, graphite or jmx.
> If none of that works for you you can register your own spark extra
> listener
> that does your bidding.
>
> ..Manas
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Effective-ways-monitor-and-identify-that-a-Streaming-job-has-been-failing-for-the-last-5-minutes-tp25536p25586.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
Hi Cody,

How to look at Option 2(see the following)? Which portion of the code in
Spark Kafka Direct to look at to handle this issue specific to our
requirements.


2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger  wrote:

> If you're consistently getting offset out of range exceptions, it's
> probably because messages are getting deleted before you've processed them.
>
> The only real way to deal with this is give kafka more retention, consume
> faster, or both.
>
> If you're just looking for a quick "fix" for an infrequent issue, option 4
> is probably easiest.  I wouldn't do that automatically / silently, because
> you're losing data.
>
> On Mon, Nov 30, 2015 at 6:22 PM, SRK  wrote:
>
>> Hi,
>>
>> So, our Streaming Job fails with the following errors. If you see the
>> errors
>> below, they are all related to Kafka losing offsets and
>> OffsetOutOfRangeException.
>>
>> What are the options we have other than fixing Kafka? We would like to do
>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>> Direct?
>>
>> 1.Need to see a way to skip some offsets if they are not available after
>> the
>> max retries are reached..in that case there might be data loss.
>>
>> 2.Catch that exception and somehow force things to "reset" for that
>> partition And how would it handle the offsets already calculated in the
>> backlog (if there is one)?
>>
>> 3.Track the offsets separately, restart the job by providing the offsets.
>>
>> 4.Or a straightforward approach would be to monitor the log for this
>> error,
>> and if it occurs more than X times, kill the job, remove the checkpoint
>> directory, and restart.
>>
>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([test_stream,5]))
>>
>>
>>
>> java.lang.ClassNotFoundException:
>> kafka.common.NotLeaderForPartitionException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> java.util.concurrent.RejectedExecutionException: Task
>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>> [Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>> 12112]
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 10
>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage
>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>
>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>> java.lang.InterruptedException
>>
>> Caused by: java.lang.InterruptedException
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 7
>> in
>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>> 33.0
>> (TID 283, 172.16.97.103): UnknownReason
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
How to avoid those Errors with receiver based approach? Suppose we are OK
with at least once processing and use receiver based approach which uses
ZooKeeper but not query Kafka directly, would these errors(Couldn't find
leader offsets for
Set([test_stream,5])))be avoided?

On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org> wrote:

> KafkaRDD.scala , handleFetchErr
>
> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> How to look at Option 2(see the following)? Which portion of the code in
>> Spark Kafka Direct to look at to handle this issue specific to our
>> requirements.
>>
>>
>> 2.Catch that exception and somehow force things to "reset" for that
>> partition And how would it handle the offsets already calculated in the
>> backlog (if there is one)?
>>
>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> If you're consistently getting offset out of range exceptions, it's
>>> probably because messages are getting deleted before you've processed them.
>>>
>>> The only real way to deal with this is give kafka more retention,
>>> consume faster, or both.
>>>
>>> If you're just looking for a quick "fix" for an infrequent issue, option
>>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>>> because you're losing data.
>>>
>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> So, our Streaming Job fails with the following errors. If you see the
>>>> errors
>>>> below, they are all related to Kafka losing offsets and
>>>> OffsetOutOfRangeException.
>>>>
>>>> What are the options we have other than fixing Kafka? We would like to
>>>> do
>>>> something like the following. How can we achieve 1 and 2 with Spark
>>>> Kafka
>>>> Direct?
>>>>
>>>> 1.Need to see a way to skip some offsets if they are not available
>>>> after the
>>>> max retries are reached..in that case there might be data loss.
>>>>
>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>> partition And how would it handle the offsets already calculated in the
>>>> backlog (if there is one)?
>>>>
>>>> 3.Track the offsets separately, restart the job by providing the
>>>> offsets.
>>>>
>>>> 4.Or a straightforward approach would be to monitor the log for this
>>>> error,
>>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>>> directory, and restart.
>>>>
>>>> ERROR DirectKafkaInputDStream:
>>>> ArrayBuffer(kafka.common.UnknownException,
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([test_stream,5]))
>>>>
>>>>
>>>>
>>>> java.lang.ClassNotFoundException:
>>>> kafka.common.NotLeaderForPartitionException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>>
>>>>
>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>
>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>> [Terminated,
>>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>>> 12112]
>>>>
>>>>
>>>>
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 10
>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>>> stage
>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>
>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>> java.lang.InterruptedException
>>>>
>>>> Caused by: java.lang.InterruptedException
>>>>
>>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>>
>>>>
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 7 in
>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>>> 33.0
>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>
>>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
Following is the Option 2 that I was talking about:

2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

On Tue, Dec 1, 2015 at 1:39 PM, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> Hi Cody,
>
> How to look at Option 2(see the following)? Which portion of the code in
> Spark Kafka Direct to look at to handle this issue specific to our
> requirements.
>
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> If you're consistently getting offset out of range exceptions, it's
>> probably because messages are getting deleted before you've processed them.
>>
>> The only real way to deal with this is give kafka more retention, consume
>> faster, or both.
>>
>> If you're just looking for a quick "fix" for an infrequent issue, option
>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>> because you're losing data.
>>
>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> So, our Streaming Job fails with the following errors. If you see the
>>> errors
>>> below, they are all related to Kafka losing offsets and
>>> OffsetOutOfRangeException.
>>>
>>> What are the options we have other than fixing Kafka? We would like to do
>>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>>> Direct?
>>>
>>> 1.Need to see a way to skip some offsets if they are not available after
>>> the
>>> max retries are reached..in that case there might be data loss.
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> 3.Track the offsets separately, restart the job by providing the offsets.
>>>
>>> 4.Or a straightforward approach would be to monitor the log for this
>>> error,
>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>> directory, and restart.
>>>
>>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([test_stream,5]))
>>>
>>>
>>>
>>> java.lang.ClassNotFoundException:
>>> kafka.common.NotLeaderForPartitionException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> java.util.concurrent.RejectedExecutionException: Task
>>>
>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>> [Terminated,
>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>> 12112]
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 10
>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>> stage
>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>
>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>> java.lang.InterruptedException
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 7 in
>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>> 33.0
>>> (TID 283, 172.16.97.103): UnknownReason
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-30 Thread swetha kasireddy
Hi Cody,

What if the Offsets that are tracked are not present in Kafka. How do I
skip those offsets and go to the next Offset? Also would specifying
rebalance.backoff.ms be of any help?

Thanks,
Swteha

On Thu, Nov 12, 2015 at 9:07 AM, Cody Koeninger <c...@koeninger.org> wrote:

> To be blunt, if you care about being able to recover from weird
> situations, you should be tracking offsets yourself and specifying offsets
> on job start, not relying on checkpoints.
>
> On Tue, Nov 10, 2015 at 3:54 AM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> I’ve seen this before during an extreme outage on the cluster, where the
>> kafka offsets checkpointed by the directstreamRdd were bigger than what
>> kafka reported. The checkpoint was therefore corrupted.
>> I don’t know the root cause but since I was stressing the cluster during
>> a reliability test I can only assume that one of the Kafka partitions was
>> restored from an out-of-sync replica and did not contain all the data.
>> Seems extreme but I don’t have another idea.
>>
>> @Cody – do you know of a way to recover from a situation like this? Can
>> someone manually delete folders from the checkpoint folder to help the job
>> recover? E.g. Go 2 steps back, hoping that kafka has those offsets.
>>
>> -adrian
>>
>> From: swetha kasireddy
>> Date: Monday, November 9, 2015 at 10:40 PM
>> To: Cody Koeninger
>> Cc: "user@spark.apache.org"
>> Subject: Re: Kafka Direct does not recover automatically when the Kafka
>> Stream gets messed up?
>>
>> OK. But, one thing that I observed is that when there is a problem with
>> Kafka Stream, unless I delete the checkpoint directory the Streaming job
>> does not restart. I guess it tries to retry the failed tasks and if it's
>> not able to recover, it fails again. Sometimes, it fails with StackOverFlow
>> Error.
>>
>> Why does the Streaming job not restart from checkpoint directory when the
>> job failed earlier with Kafka Brokers getting messed up? We have the
>> checkpoint directory in our hdfs.
>>
>> On Mon, Nov 9, 2015 at 12:34 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> I don't think deleting the checkpoint directory is a good way to restart
>>> the streaming job, you should stop the spark context or at the very least
>>> kill the driver process, then restart.
>>>
>>> On Mon, Nov 9, 2015 at 2:03 PM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
>>>> Hi Cody,
>>>>
>>>> Our job is our failsafe as we don't have Control over Kafka Stream as
>>>> of now. Can setting rebalance max retries help? We do not have any monitors
>>>> setup as of now. We need to setup the monitors.
>>>>
>>>> My idea is to to have some kind of Cron job that queries the Streaming
>>>> API for monitoring like every 5 minutes and then send an email alert and
>>>> automatically restart the Streaming job by deleting the Checkpoint
>>>> directory. Would that help?
>>>>
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> On Mon, Nov 9, 2015 at 11:09 AM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> The direct stream will fail the task if there is a problem with the
>>>>> kafka broker.  Spark will retry failed tasks automatically, which should
>>>>> handle broker rebalances that happen in a timely fashion.
>>>>> spark.tax.maxFailures controls the maximum number of retries before 
>>>>> failing
>>>>> the job.  Direct stream isn't any different from any other spark task in
>>>>> that regard.
>>>>>
>>>>> The question of what kind of monitoring you need is more a question
>>>>> for your particular infrastructure and what you're already using for
>>>>> monitoring.  We put all metrics (application level or system level) into
>>>>> graphite and alert from there.
>>>>>
>>>>> I will say that if you've regularly got problems with kafka falling
>>>>> over for half an hour, I'd look at fixing that before worrying about spark
>>>>> monitoring...
>>>>>
>>>>>
>>>>> On Mon, Nov 9, 2015 at 12:26 PM, swetha <swethakasire...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> How to recover Kafka Direct automatically when the there is a problem
>>>

Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-30 Thread swetha kasireddy
So, our Streaming Job fails with the following errors. If you see the
errors(highlighted in blue below), they are all related to Kafka losing
offsets and OffsetOutOfRangeException.

What are the options we have other than fixing Kafka? We would like to do
something like the following. How can we achieve 1 and 2 with Spark Kafka
Direct?

1.Need to see a way to skip some offsets if they are not available after
the max retries are reached..in that case there might be data loss.

2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

3.Track the offsets separately, restart the job by providing the offsets.

4.Or a straightforward approach would be to monitor the log for this error,
and if it occurs more than X times, kill the job, remove the checkpoint
directory, and restart.

ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([test_stream,5]))


java.lang.ClassNotFoundException:
kafka.common.NotLeaderForPartitionException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)


java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
rejected from java.util.concurrent.ThreadPoolExecutor@543258e0[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
12112]


org.apache.spark.SparkException: Job aborted due to stage failure: Task 10
in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage
52.0 (TID 255, 172.16.97.97): UnknownReason

Exception in thread "streaming-job-executor-0" java.lang.Error:
java.lang.InterruptedException

Caused by: java.lang.InterruptedException

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)


org.apache.spark.SparkException: Job aborted due to stage failure: Task 7
in stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
33.0 (TID 283, 172.16.97.103): UnknownReason

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)








On Mon, Nov 30, 2015 at 12:23 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You'd need to get the earliest or latest available offsets from kafka,
> whichever is most appropriate for your situation.
>
> The KafkaRDD will use the value of refresh.leader.backoff.ms, so you can
> try adjusting that to get a longer sleep before retrying the task.
>
> On Mon, Nov 30, 2015 at 1:50 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> What if the Offsets that are tracked are not present in Kafka. How do I
>> skip those offsets and go to the next Offset? Also would specifying
>> rebalance.backoff.ms be of any help?
>>
>> Thanks,
>> Swteha
>>
>> On Thu, Nov 12, 2015 at 9:07 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> To be blunt, if you care about being able to recover from weird
>>> situations, you should be tracking offsets yourself and specifying offsets
>>> on job start, not relying on checkpoints.
>>>
>>> On Tue, Nov 10, 2015 at 3:54 AM, Adrian Tanase <atan...@adobe.com>
>>> wrote:
>>>
>>>> I’ve seen this before during an extreme outage on the cluster, where
>>>> the kafka offsets checkpointed by the directstreamRdd were bigger than what
>>>> kafka reported. The checkpoint was therefore corrupted.
>>>> I don’t know the root cause but since I was stressing the cluster
>>>> during a reliability test I can only assume that one of the Kafka
>>>> partitions was restored from an out-of-sync replica and did not contain all
>>>> the data. Seems extreme but I don’t have another idea.
>>>>
>>>> @Cody – do you know of a way to recover from a situation like this? Can
>>>> someone manually delete folders from the checkpoint folder to help the job
>>>> recover? E.g. Go 2 steps back, hoping that kafka has those offsets.
>>>>
>>>> -adrian
>>>>
>>>> From: swetha kasireddy
>>>> Date: Monday, November 9, 2015 at 10:40 PM
>>>> To: Cody Koeninger
>>>> Cc: "user@spark.apache.org"
>>>> Subject: Re: Kafka Direct does not recover automatically when the
>>>> Kafka Stream gets messed up?
>>>>
>>>

Re: Automatic driver restart does not seem to be working in Spark Standalone

2015-11-28 Thread swetha kasireddy
Yes. I mean killing the Spark Job from UI. Also I use
context.awaitTermination().

On Wed, Nov 25, 2015 at 6:23 PM, Tathagata Das <t...@databricks.com> wrote:

> What do you mean by killing the streaming job using UI? Do you mean that
> you are clicking the "kill" link in the Jobs page in the Spark UI?
>
> Also in the application, is the main thread waiting on
> streamingContext.awaitTermination()? That is designed to catch exceptions
> in running job and throw it in the main thread, so that the java program
> exits with an exception and non-zero exit code.
>
>
>
>
> On Wed, Nov 25, 2015 at 12:57 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> I am killing my Streaming job using UI. What error code does UI provide
>> if the job is killed from there?
>>
>> On Wed, Nov 25, 2015 at 11:01 AM, Kay-Uwe Moosheimer <u...@moosheimer.com>
>> wrote:
>>
>>> Testet with Spark 1.5.2 … Works perfect when exit code is non-zero.
>>> And does not Restart with exit code equals zero.
>>>
>>>
>>> Von: Prem Sure <premsure...@gmail.com>
>>> Datum: Mittwoch, 25. November 2015 19:57
>>> An: SRK <swethakasire...@gmail.com>
>>> Cc: <user@spark.apache.org>
>>> Betreff: Re: Automatic driver restart does not seem to be working in
>>> Spark Standalone
>>>
>>> I think automatic driver restart will happen, if driver fails with
>>> non-zero exit code.
>>>
>>>   --deploy-mode cluster
>>>   --supervise
>>>
>>>
>>>
>>> On Wed, Nov 25, 2015 at 1:46 PM, SRK <swethakasire...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am submitting my Spark job with supervise option as shown below. When
>>>> I
>>>> kill the driver and the app from UI, the driver does not restart
>>>> automatically. This is in a cluster mode.  Any suggestion on how to make
>>>> Automatic Driver Restart work would be of great help.
>>>>
>>>> --supervise
>>>>
>>>>
>>>> Thanks,
>>>> Swetha
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Automatic-driver-restart-does-not-seem-to-be-working-in-Spark-Standalone-tp25478.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Automatic driver restart does not seem to be working in Spark Standalone

2015-11-25 Thread swetha kasireddy
I am killing my Streaming job using UI. What error code does UI provide if
the job is killed from there?

On Wed, Nov 25, 2015 at 11:01 AM, Kay-Uwe Moosheimer <u...@moosheimer.com>
wrote:

> Testet with Spark 1.5.2 … Works perfect when exit code is non-zero.
> And does not Restart with exit code equals zero.
>
>
> Von: Prem Sure <premsure...@gmail.com>
> Datum: Mittwoch, 25. November 2015 19:57
> An: SRK <swethakasire...@gmail.com>
> Cc: <user@spark.apache.org>
> Betreff: Re: Automatic driver restart does not seem to be working in
> Spark Standalone
>
> I think automatic driver restart will happen, if driver fails with
> non-zero exit code.
>
>   --deploy-mode cluster
>   --supervise
>
>
>
> On Wed, Nov 25, 2015 at 1:46 PM, SRK <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> I am submitting my Spark job with supervise option as shown below. When I
>> kill the driver and the app from UI, the driver does not restart
>> automatically. This is in a cluster mode.  Any suggestion on how to make
>> Automatic Driver Restart work would be of great help.
>>
>> --supervise
>>
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Automatic-driver-restart-does-not-seem-to-be-working-in-Spark-Standalone-tp25478.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Kafka Direct Error

2015-11-24 Thread swetha kasireddy
Is it possible that the kafka offset api is somehow returning the wrong
offsets. Because each time the job fails for different partitions with an
error similar to the error that I get below.

Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
most recent failure: Lost task 20.3 in stage 117.0 (TID 2114, 10.227.64.52):
java.lang.AssertionError: assertion failed: Ran out of messages before
reaching ending offset 221572238 for topic hubble_stream partition 88 start
221563725. This should not happen, and indicates that messages may have been
lost

On Tue, Nov 24, 2015 at 6:31 AM, Cody Koeninger <c...@koeninger.org> wrote:

> No, the direct stream only communicates with Kafka brokers, not Zookeeper
> directly.  It asks the leader for each topicpartition what the highest
> available offsets are, using the Kafka offset api.
>
> On Mon, Nov 23, 2015 at 11:36 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Does Kafka direct query the offsets from the zookeeper directly? From
>> where does it get the offsets? There is data in those offsets, but somehow
>> Kafka Direct does not seem to pick it up. Other Consumers that use Zoo
>> Keeper Quorum of that Stream seems to be fine. Only Kafka Direct seems to
>> have issues. How does Kafka Direct know which offsets to query after
>> getting the initial batches from  "auto.offset.reset" -> "largest"?
>>
>> On Mon, Nov 23, 2015 at 11:01 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> No, that means that at the time the batch was scheduled, the kafka
>>> leader reported the ending offset was 221572238, but during processing,
>>> kafka stopped returning messages before reaching that ending offset.
>>>
>>> That probably means something got screwed up with Kafka - e.g. you lost
>>> a leader and lost messages in the process.
>>>
>>> On Mon, Nov 23, 2015 at 12:57 PM, swetha <swethakasire...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I see the following error in my Spark Kafka Direct. Would this mean that
>>>> Kafka Direct is not able to catch up with the messages and is failing?
>>>>
>>>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
>>>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
>>>> 10.227.64.52):
>>>> java.lang.AssertionError: assertion failed: Ran out of messages before
>>>> reaching ending offset 221572238 for topic hubble_stream partition 88
>>>> start
>>>> 221563725. This should not happen, and indicates that messages may have
>>>> been
>>>> lost
>>>>
>>>> Thanks,
>>>> Swetha
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Spark Kafka Direct Error

2015-11-24 Thread swetha kasireddy
I see the assertion error when I compare the offset ranges as shown below.
How do I log the offset for each message?


kafkaStream.transform { rdd =>
  // Get the offset ranges in the RDD
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
LOGGER.info(s"${o.topic} ${o.partition} ${o.fromOffset}
${o.untilOffset}"+"Queried offsets")
  }
 val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
// For each partition, get size of the range in the partition,
// and the number of items in the partition
val off = offsetRanges(i)
val all = iter.toSeq
val partSize = all.size
val rangeSize = off.untilOffset - off.fromOffset
Iterator((partSize, rangeSize))
  }.collect*/

  // Verify whether number of elements in each partition
  // matches with the corresponding offset range
  collected.foreach { case (partSize, rangeSize) =>
assert(partSize == rangeSize, "offset ranges are wrong")
  }
}


On Tue, Nov 24, 2015 at 8:33 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Anything's possible, but that sounds pretty unlikely to me.
> Are the partitions it's failing for all on the same leader?
> Have there been any leader rebalances?
> Do you have enough log retention?
> If you log the offset for each message as it's processed, when do you see
> the problem?
>
> On Tue, Nov 24, 2015 at 10:28 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Is it possible that the kafka offset api is somehow returning the wrong
>> offsets. Because each time the job fails for different partitions with an
>> error similar to the error that I get below.
>>
>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
>> 10.227.64.52):
>> java.lang.AssertionError: assertion failed: Ran out of messages before
>> reaching ending offset 221572238 for topic hubble_stream partition 88
>> start
>> 221563725. This should not happen, and indicates that messages may have
>> been
>> lost
>>
>> On Tue, Nov 24, 2015 at 6:31 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> No, the direct stream only communicates with Kafka brokers, not
>>> Zookeeper directly.  It asks the leader for each topicpartition what the
>>> highest available offsets are, using the Kafka offset api.
>>>
>>> On Mon, Nov 23, 2015 at 11:36 PM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
>>>> Does Kafka direct query the offsets from the zookeeper directly? From
>>>> where does it get the offsets? There is data in those offsets, but somehow
>>>> Kafka Direct does not seem to pick it up. Other Consumers that use Zoo
>>>> Keeper Quorum of that Stream seems to be fine. Only Kafka Direct seems to
>>>> have issues. How does Kafka Direct know which offsets to query after
>>>> getting the initial batches from  "auto.offset.reset" -> "largest"?
>>>>
>>>> On Mon, Nov 23, 2015 at 11:01 AM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> No, that means that at the time the batch was scheduled, the kafka
>>>>> leader reported the ending offset was 221572238, but during
>>>>> processing, kafka stopped returning messages before reaching that ending
>>>>> offset.
>>>>>
>>>>> That probably means something got screwed up with Kafka - e.g. you
>>>>> lost a leader and lost messages in the process.
>>>>>
>>>>> On Mon, Nov 23, 2015 at 12:57 PM, swetha <swethakasire...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I see the following error in my Spark Kafka Direct. Would this mean
>>>>>> that
>>>>>> Kafka Direct is not able to catch up with the messages and is failing?
>>>>>>
>>>>>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4
>>>>>> times,
>>>>>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
>>>>>> 10.227.64.52):
>>>>>> java.lang.AssertionError: assertion failed: Ran out of messages before
>>>>>> reaching ending offset 221572238 for topic hubble_stream partition 88
>>>>>> start
>>>>>> 221563725. This should not happen, and indicates that messages may
>>>>>> have been
>>>>>> lost
>>>>>>
>>>>>> Thanks,
>>>>>> Swetha
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: How to have Kafka Direct Consumers show up in Kafka Consumer reporting?

2015-11-23 Thread swetha kasireddy
OK. I see the following to query the offsets. In our Kafka Stream, the
offsets are stored in ZooKeeper and I am not updating Offsets in Zookeeper.

How does Kafka Direct know which offsets to query?  Does it calculate
automatically as to which offsets to query?I have "auto.offset.reset" ->
"largest".

It looks like it is trying to query the offset from the latest offset each
time and those offsets are not available in Kafka Stream anymore. Other
Consumers that use the same stream and has zookeeper quorum seems to be
working fine.


 // Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array[OffsetRange]()

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
   ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
 println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }


On Mon, Nov 23, 2015 at 6:31 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> Also, does Kafka direct query the offsets from the zookeeper directly?
> From where does it get the offsets? There is data in those offsets, but
> somehow Kafka Direct does not seem to pick it up?
>
> On Mon, Nov 23, 2015 at 6:18 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> I mean to show the Spark Kafka Direct consumers in Kafka Stream UI.
>> Usually we create a consumer and the consumer gets shown in the Kafka
>> Stream UI. How do I log the offsets in the Spark Job?
>>
>> On Mon, Nov 23, 2015 at 6:11 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> What exactly do you mean by kafka consumer reporting?
>>>
>>> I'd log the offsets in your spark job and try running
>>>
>>> kafka-simple-consumer-shell.sh --partition $yourbadpartition
>>> --print-offsets
>>>
>>> at the same time your spark job is running
>>>
>>> On Mon, Nov 23, 2015 at 7:37 PM, swetha <swethakasire...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We see a bunch of issues like the following in Our Spark Kafka Direct.
>>>> Any
>>>> idea  as to how make Kafka Direct Consumers show up in Kafka Consumer
>>>> reporting to debug this issue?
>>>>
>>>>
>>>> Job aborted due to stage failure: Task 47 in stage 336.0 failed 4 times,
>>>> most recent failure: Lost task 47.3 in stage 336.0 (TID 5283,
>>>> 10.227.64.52):
>>>> java.lang.AssertionError: assertion failed: Ran out of messages before
>>>> reaching ending offset 225474235 for topic hubble_stream partition 55
>>>> start
>>>> 225467496. This should not happen, and indicates that messages may have
>>>> been
>>>> lost
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Kafka-Direct-Consumers-show-up-in-Kafka-Consumer-reporting-tp25457.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Spark Kafka Direct Error

2015-11-23 Thread swetha kasireddy
Does Kafka direct query the offsets from the zookeeper directly? From where
does it get the offsets? There is data in those offsets, but somehow Kafka
Direct does not seem to pick it up. Other Consumers that use Zoo Keeper
Quorum of that Stream seems to be fine. Only Kafka Direct seems to have
issues. How does Kafka Direct know which offsets to query after getting the
initial batches from  "auto.offset.reset" -> "largest"?

On Mon, Nov 23, 2015 at 11:01 AM, Cody Koeninger <c...@koeninger.org> wrote:

> No, that means that at the time the batch was scheduled, the kafka leader
> reported the ending offset was 221572238, but during processing, kafka
> stopped returning messages before reaching that ending offset.
>
> That probably means something got screwed up with Kafka - e.g. you lost a
> leader and lost messages in the process.
>
> On Mon, Nov 23, 2015 at 12:57 PM, swetha <swethakasire...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I see the following error in my Spark Kafka Direct. Would this mean that
>> Kafka Direct is not able to catch up with the messages and is failing?
>>
>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
>> 10.227.64.52):
>> java.lang.AssertionError: assertion failed: Ran out of messages before
>> reaching ending offset 221572238 for topic hubble_stream partition 88
>> start
>> 221563725. This should not happen, and indicates that messages may have
>> been
>> lost
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


How to have Kafka Direct Consumers show up in Kafka Consumer reporting?

2015-11-23 Thread swetha
Hi,

We see a bunch of issues like the following in Our Spark Kafka Direct. Any
idea  as to how make Kafka Direct Consumers show up in Kafka Consumer
reporting to debug this issue?


Job aborted due to stage failure: Task 47 in stage 336.0 failed 4 times,
most recent failure: Lost task 47.3 in stage 336.0 (TID 5283, 10.227.64.52):
java.lang.AssertionError: assertion failed: Ran out of messages before
reaching ending offset 225474235 for topic hubble_stream partition 55 start
225467496. This should not happen, and indicates that messages may have been
lost



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Kafka-Direct-Consumers-show-up-in-Kafka-Consumer-reporting-tp25457.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to have Kafka Direct Consumers show up in Kafka Consumer reporting?

2015-11-23 Thread swetha kasireddy
I mean to show the Spark Kafka Direct consumers in Kafka Stream UI. Usually
we create a consumer and the consumer gets shown in the Kafka Stream UI.
How do I log the offsets in the Spark Job?

On Mon, Nov 23, 2015 at 6:11 PM, Cody Koeninger <c...@koeninger.org> wrote:

> What exactly do you mean by kafka consumer reporting?
>
> I'd log the offsets in your spark job and try running
>
> kafka-simple-consumer-shell.sh --partition $yourbadpartition
> --print-offsets
>
> at the same time your spark job is running
>
> On Mon, Nov 23, 2015 at 7:37 PM, swetha <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> We see a bunch of issues like the following in Our Spark Kafka Direct. Any
>> idea  as to how make Kafka Direct Consumers show up in Kafka Consumer
>> reporting to debug this issue?
>>
>>
>> Job aborted due to stage failure: Task 47 in stage 336.0 failed 4 times,
>> most recent failure: Lost task 47.3 in stage 336.0 (TID 5283,
>> 10.227.64.52):
>> java.lang.AssertionError: assertion failed: Ran out of messages before
>> reaching ending offset 225474235 for topic hubble_stream partition 55
>> start
>> 225467496. This should not happen, and indicates that messages may have
>> been
>> lost
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Kafka-Direct-Consumers-show-up-in-Kafka-Consumer-reporting-tp25457.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to have Kafka Direct Consumers show up in Kafka Consumer reporting?

2015-11-23 Thread swetha kasireddy
Also, does Kafka direct query the offsets from the zookeeper directly? From
where does it get the offsets? There is data in those offsets, but somehow
Kafka Direct does not seem to pick it up?

On Mon, Nov 23, 2015 at 6:18 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> I mean to show the Spark Kafka Direct consumers in Kafka Stream UI.
> Usually we create a consumer and the consumer gets shown in the Kafka
> Stream UI. How do I log the offsets in the Spark Job?
>
> On Mon, Nov 23, 2015 at 6:11 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> What exactly do you mean by kafka consumer reporting?
>>
>> I'd log the offsets in your spark job and try running
>>
>> kafka-simple-consumer-shell.sh --partition $yourbadpartition
>> --print-offsets
>>
>> at the same time your spark job is running
>>
>> On Mon, Nov 23, 2015 at 7:37 PM, swetha <swethakasire...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We see a bunch of issues like the following in Our Spark Kafka Direct.
>>> Any
>>> idea  as to how make Kafka Direct Consumers show up in Kafka Consumer
>>> reporting to debug this issue?
>>>
>>>
>>> Job aborted due to stage failure: Task 47 in stage 336.0 failed 4 times,
>>> most recent failure: Lost task 47.3 in stage 336.0 (TID 5283,
>>> 10.227.64.52):
>>> java.lang.AssertionError: assertion failed: Ran out of messages before
>>> reaching ending offset 225474235 for topic hubble_stream partition 55
>>> start
>>> 225467496. This should not happen, and indicates that messages may have
>>> been
>>> lost
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Kafka-Direct-Consumers-show-up-in-Kafka-Consumer-reporting-tp25457.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Spark Kafka Direct Error

2015-11-23 Thread swetha
Hi,

I see the following error in my Spark Kafka Direct. Would this mean that
Kafka Direct is not able to catch up with the messages and is failing?

Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
most recent failure: Lost task 20.3 in stage 117.0 (TID 2114, 10.227.64.52):
java.lang.AssertionError: assertion failed: Ran out of messages before
reaching ending offset 221572238 for topic hubble_stream partition 88 start
221563725. This should not happen, and indicates that messages may have been
lost

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming Job gives error after changing to version 1.5.2

2015-11-19 Thread swetha kasireddy
That was actually an issue with our Mesos.

On Wed, Nov 18, 2015 at 5:29 PM, Tathagata Das <t...@databricks.com> wrote:

> If possible, could you give us the root cause and solution for future
> readers of this thread.
>
> On Wed, Nov 18, 2015 at 6:37 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> It works fine after some changes.
>>
>> -Thanks,
>> Swetha
>>
>> On Tue, Nov 17, 2015 at 10:22 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> Can you verify that the cluster is running the correct version of Spark.
>>> 1.5.2.
>>>
>>> On Tue, Nov 17, 2015 at 7:23 PM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
>>>> Sorry compile makes it work locally. But, the cluster
>>>> still seems to have issues with provided. Basically it
>>>> does not seem to process any records, no data is shown in any of the tabs
>>>> of the Streaming UI except the Streaming tab. Executors, Storage, Stages
>>>> etc show empty RDDs.
>>>>
>>>> On Tue, Nov 17, 2015 at 7:19 PM, swetha kasireddy <
>>>> swethakasire...@gmail.com> wrote:
>>>>
>>>>> Hi TD,
>>>>>
>>>>> Basically, I see two issues. With provided the job
>>>>> does not start localy. It does start in Cluster but seems  no data is
>>>>> getting processed.
>>>>>
>>>>> Thanks,
>>>>> Swetha
>>>>>
>>>>> On Tue, Nov 17, 2015 at 7:04 PM, Tim Barthram <tim.barth...@iag.com.au
>>>>> > wrote:
>>>>>
>>>>>> If you are running a local context, could it be that you should use:
>>>>>>
>>>>>>
>>>>>>
>>>>>> provided
>>>>>>
>>>>>>
>>>>>>
>>>>>> ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Tim
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* swetha kasireddy [mailto:swethakasire...@gmail.com]
>>>>>> *Sent:* Wednesday, 18 November 2015 2:01 PM
>>>>>> *To:* Tathagata Das
>>>>>> *Cc:* user
>>>>>> *Subject:* Re: Streaming Job gives error after changing to version
>>>>>> 1.5.2
>>>>>>
>>>>>>
>>>>>>
>>>>>> This error I see locally.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Nov 17, 2015 at 5:44 PM, Tathagata Das <t...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>> Are you running 1.5.2-compiled jar on a Spark 1.5.2 cluster?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Nov 17, 2015 at 5:34 PM, swetha <swethakasire...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I see  java.lang.NoClassDefFoundError after changing the Streaming job
>>>>>> version to 1.5.2. Any idea as to why this is happening? Following are
>>>>>> my
>>>>>> dependencies and the error that I get.
>>>>>>
>>>>>>   
>>>>>> org.apache.spark
>>>>>> spark-core_2.10
>>>>>> ${sparkVersion}
>>>>>> provided
>>>>>> 
>>>>>>
>>>>>>
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-streaming_2.10
>>>>>> ${sparkVersion}
>>>>>> provided
>>>>>> 
>>>>>>
>>>>>>
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-sql_2.10
>>>>>> ${sparkVersion}
>>>>>> provided
>>>>>> 
>>>>>>
>>>>>>
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-hive_2.10
>>>>>> ${sparkVersion}
>>>&g

Re: How to clear the temp files that gets created by shuffle in Spark Streaming

2015-11-19 Thread swetha kasireddy
OK. We have a long running streaming job. I was thinking that may be we
should have a cron to clear files that are older than 2 days. What would be
an appropriate way to do that?

On Wed, Nov 18, 2015 at 7:43 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Have you seen SPARK-5836 ?
> Note TD's comment at the end.
>
> Cheers
>
> On Wed, Nov 18, 2015 at 7:28 PM, swetha <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> We have a lot of temp files that gets created due to shuffles caused by
>> group by. How to clear the files that gets created due to intermediate
>> operations in group by?
>>
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-clear-the-temp-files-that-gets-created-by-shuffle-in-Spark-Streaming-tp25425.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


FastUtil DataStructures in Spark

2015-11-19 Thread swetha
Hi,

Has anybody used FastUtil equivalent to HashSet for Strings in Spark? Any
example would be of great help.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/FastUtil-DataStructures-in-Spark-tp25429.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to return a pair RDD from an RDD that has foreachPartition applied?

2015-11-18 Thread swetha kasireddy
Looks like I can use mapPartitions but can it be done using
forEachPartition?

On Tue, Nov 17, 2015 at 11:51 PM, swetha <swethakasire...@gmail.com> wrote:

> Hi,
>
> How to return an RDD of key/value pairs from an RDD that has
> foreachPartition applied. I have my code something like the following. It
> looks like an RDD that has foreachPartition can have only the return type
> as
> Unit. How do I apply foreachPartition and do a save and at the same return
> a
> pair RDD.
>
>  def saveDataPointsBatchNew(records: RDD[(String, (Long,
> java.util.LinkedHashMap[java.lang.Long,
> java.lang.Float],java.util.LinkedHashMap[java.lang.Long, java.lang.Float],
> java.util.HashSet[java.lang.String] , Boolean))])= {
> records.foreachPartition({ partitionOfRecords =>
>   val dataLoader = new DataLoaderImpl();
>   var metricList = new java.util.ArrayList[String]();
>   var storageTimeStamp = 0l
>
>   if (partitionOfRecords != null) {
> partitionOfRecords.foreach(record => {
>
> if (record._2._1 == 0l) {
> entrySet = record._2._3.entrySet()
> itr = entrySet.iterator();
> while (itr.hasNext()) {
> val entry = itr.next();
> storageTimeStamp = entry.getKey.toLong
> val dayCounts = entry.getValue
> metricsDayCounts += record._1 ->(storageTimeStamp,
> dayCounts.toFloat)
> }
> }
>}
> }
> )
>   }
>
>   //Code to insert the last successful batch/streaming timestamp  ends
>   dataLoader.saveDataPoints(metricList);
>   metricList = null
>
> })
>   }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-return-a-pair-RDD-from-an-RDD-that-has-foreachPartition-applied-tp25411.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Streaming Job gives error after changing to version 1.5.2

2015-11-18 Thread swetha kasireddy
It works fine after some changes.

-Thanks,
Swetha

On Tue, Nov 17, 2015 at 10:22 PM, Tathagata Das <t...@databricks.com> wrote:

> Can you verify that the cluster is running the correct version of Spark.
> 1.5.2.
>
> On Tue, Nov 17, 2015 at 7:23 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Sorry compile makes it work locally. But, the cluster
>> still seems to have issues with provided. Basically it
>> does not seem to process any records, no data is shown in any of the tabs
>> of the Streaming UI except the Streaming tab. Executors, Storage, Stages
>> etc show empty RDDs.
>>
>> On Tue, Nov 17, 2015 at 7:19 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi TD,
>>>
>>> Basically, I see two issues. With provided the job does
>>> not start localy. It does start in Cluster but seems  no data is
>>> getting processed.
>>>
>>> Thanks,
>>> Swetha
>>>
>>> On Tue, Nov 17, 2015 at 7:04 PM, Tim Barthram <tim.barth...@iag.com.au>
>>> wrote:
>>>
>>>> If you are running a local context, could it be that you should use:
>>>>
>>>>
>>>>
>>>> provided
>>>>
>>>>
>>>>
>>>> ?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Tim
>>>>
>>>>
>>>>
>>>> *From:* swetha kasireddy [mailto:swethakasire...@gmail.com]
>>>> *Sent:* Wednesday, 18 November 2015 2:01 PM
>>>> *To:* Tathagata Das
>>>> *Cc:* user
>>>> *Subject:* Re: Streaming Job gives error after changing to version
>>>> 1.5.2
>>>>
>>>>
>>>>
>>>> This error I see locally.
>>>>
>>>>
>>>>
>>>> On Tue, Nov 17, 2015 at 5:44 PM, Tathagata Das <t...@databricks.com>
>>>> wrote:
>>>>
>>>> Are you running 1.5.2-compiled jar on a Spark 1.5.2 cluster?
>>>>
>>>>
>>>>
>>>> On Tue, Nov 17, 2015 at 5:34 PM, swetha <swethakasire...@gmail.com>
>>>> wrote:
>>>>
>>>>
>>>>
>>>> Hi,
>>>>
>>>> I see  java.lang.NoClassDefFoundError after changing the Streaming job
>>>> version to 1.5.2. Any idea as to why this is happening? Following are my
>>>> dependencies and the error that I get.
>>>>
>>>>   
>>>> org.apache.spark
>>>> spark-core_2.10
>>>> ${sparkVersion}
>>>> provided
>>>> 
>>>>
>>>>
>>>> 
>>>> org.apache.spark
>>>> spark-streaming_2.10
>>>> ${sparkVersion}
>>>> provided
>>>> 
>>>>
>>>>
>>>> 
>>>> org.apache.spark
>>>> spark-sql_2.10
>>>> ${sparkVersion}
>>>> provided
>>>> 
>>>>
>>>>
>>>> 
>>>> org.apache.spark
>>>> spark-hive_2.10
>>>> ${sparkVersion}
>>>> provided
>>>> 
>>>>
>>>>
>>>>
>>>> 
>>>> org.apache.spark
>>>> spark-streaming-kafka_2.10
>>>> ${sparkVersion}
>>>> 
>>>>
>>>>
>>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>>> org/apache/spark/streaming/StreamingContext
>>>> at java.lang.Class.getDeclaredMethods0(Native Method)
>>>> at java.lang.Class.privateGetDeclaredMethods(Class.java:2693)
>>>> at java.lang.Class.privateGetMethodRecursive(Class.java:3040)
>>>> at java.lang.Class.getMethod0(Class.java:3010)
>>>> at java.lang.Class.getMethod(Class.java:1776)
>>>> at
>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:125)
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> org.apache.spark.streaming.StreamingContext
>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>>>> at java.net.URLClassLoader$1.run(URLClassLo

How to clear the temp files that gets created by shuffle in Spark Streaming

2015-11-18 Thread swetha
Hi,

We have a lot of temp files that gets created due to shuffles caused by
group by. How to clear the files that gets created due to intermediate
operations in group by?


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-clear-the-temp-files-that-gets-created-by-shuffle-in-Spark-Streaming-tp25425.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Streaming Job gives error after changing to version 1.5.2

2015-11-17 Thread swetha


Hi,

I see  java.lang.NoClassDefFoundError after changing the Streaming job
version to 1.5.2. Any idea as to why this is happening? Following are my
dependencies and the error that I get.

  
org.apache.spark
spark-core_2.10
${sparkVersion}
provided




org.apache.spark
spark-streaming_2.10
${sparkVersion}
provided




org.apache.spark
spark-sql_2.10
${sparkVersion}
provided




org.apache.spark
spark-hive_2.10
${sparkVersion}
provided





org.apache.spark
spark-streaming-kafka_2.10
${sparkVersion}



Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/StreamingContext
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2693)
at java.lang.Class.privateGetMethodRecursive(Class.java:3040)
at java.lang.Class.getMethod0(Class.java:3010)
at java.lang.Class.getMethod(Class.java:1776)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:125)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.streaming.StreamingContext
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Job-gives-error-after-changing-to-version-1-5-2-tp25406.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming Job gives error after changing to version 1.5.2

2015-11-17 Thread swetha kasireddy
This error I see locally.

On Tue, Nov 17, 2015 at 5:44 PM, Tathagata Das <t...@databricks.com> wrote:

> Are you running 1.5.2-compiled jar on a Spark 1.5.2 cluster?
>
> On Tue, Nov 17, 2015 at 5:34 PM, swetha <swethakasire...@gmail.com> wrote:
>
>>
>>
>> Hi,
>>
>> I see  java.lang.NoClassDefFoundError after changing the Streaming job
>> version to 1.5.2. Any idea as to why this is happening? Following are my
>> dependencies and the error that I get.
>>
>>   
>> org.apache.spark
>> spark-core_2.10
>> ${sparkVersion}
>> provided
>> 
>>
>>
>> 
>> org.apache.spark
>> spark-streaming_2.10
>> ${sparkVersion}
>> provided
>> 
>>
>>
>> 
>> org.apache.spark
>> spark-sql_2.10
>> ${sparkVersion}
>> provided
>> 
>>
>>
>> 
>> org.apache.spark
>> spark-hive_2.10
>> ${sparkVersion}
>> provided
>> 
>>
>>
>>
>> 
>> org.apache.spark
>> spark-streaming-kafka_2.10
>> ${sparkVersion}
>> 
>>
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/StreamingContext
>> at java.lang.Class.getDeclaredMethods0(Native Method)
>> at java.lang.Class.privateGetDeclaredMethods(Class.java:2693)
>> at java.lang.Class.privateGetMethodRecursive(Class.java:3040)
>> at java.lang.Class.getMethod0(Class.java:3010)
>> at java.lang.Class.getMethod(Class.java:1776)
>> at
>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:125)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.StreamingContext
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Job-gives-error-after-changing-to-version-1-5-2-tp25406.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


How to return a pair RDD from an RDD that has foreachPartition applied?

2015-11-17 Thread swetha
Hi,

How to return an RDD of key/value pairs from an RDD that has
foreachPartition applied. I have my code something like the following. It
looks like an RDD that has foreachPartition can have only the return type as
Unit. How do I apply foreachPartition and do a save and at the same return a
pair RDD.

 def saveDataPointsBatchNew(records: RDD[(String, (Long,
java.util.LinkedHashMap[java.lang.Long,
java.lang.Float],java.util.LinkedHashMap[java.lang.Long, java.lang.Float],
java.util.HashSet[java.lang.String] , Boolean))])= {
records.foreachPartition({ partitionOfRecords =>
  val dataLoader = new DataLoaderImpl();
  var metricList = new java.util.ArrayList[String]();
  var storageTimeStamp = 0l

  if (partitionOfRecords != null) {
partitionOfRecords.foreach(record => {

if (record._2._1 == 0l) {
entrySet = record._2._3.entrySet()
itr = entrySet.iterator();
while (itr.hasNext()) {
val entry = itr.next();
storageTimeStamp = entry.getKey.toLong
val dayCounts = entry.getValue
metricsDayCounts += record._1 ->(storageTimeStamp,
dayCounts.toFloat)
}
}
   }
}
)
  }

  //Code to insert the last successful batch/streaming timestamp  ends
  dataLoader.saveDataPoints(metricList);
  metricList = null

})
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-return-a-pair-RDD-from-an-RDD-that-has-foreachPartition-applied-tp25411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: parquet.io.ParquetEncodingException Warning when trying to save parquet file in Spark

2015-11-09 Thread swetha kasireddy
I am using the following:



com.twitter
parquet-avro
1.6.0



On Mon, Nov 9, 2015 at 1:00 AM, Fengdong Yu <fengdo...@everstring.com>
wrote:

> Which Spark version used?
>
> It was fixed in Parquet-1.7x, so Spark-1.5.x will be work.
>
>
>
>
> > On Nov 9, 2015, at 3:43 PM, swetha <swethakasire...@gmail.com> wrote:
> >
> > Hi,
> >
> > I see unwanted Warning when I try to save a Parquet file in hdfs in
> Spark.
> > Please find below the code and the Warning message. Any idea as to how to
> > avoid the unwanted Warning message?
> >
> > activeSessionsToBeSaved.saveAsNewAPIHadoopFile("test", classOf[Void],
> > classOf[ActiveSession],
> >  classOf[ParquetOutputFormat[ActiveSession]], job.getConfiguration)
> >
> > Nov 8, 2015 11:35:39 PM WARNING: parquet.hadoop.ParquetOutputCommitter:
> > could not write summary file for active_sessions_current
> > parquet.io.ParquetEncodingException:
> > maprfs:/user/testId/active_sessions_current/part-r-00142.parquet invalid:
> > all the files must be contained in the root active_sessions_current
> >   at
> > parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:422)
> >   at
> >
> parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:398)
> >   at
> >
> parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:51)
> >   at
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1056)
> >   at
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:998)
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/parquet-io-ParquetEncodingException-Warning-when-trying-to-save-parquet-file-in-Spark-tp25326.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>


Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-09 Thread swetha kasireddy
OK. But, one thing that I observed is that when there is a problem with
Kafka Stream, unless I delete the checkpoint directory the Streaming job
does not restart. I guess it tries to retry the failed tasks and if it's
not able to recover, it fails again. Sometimes, it fails with StackOverFlow
Error.

Why does the Streaming job not restart from checkpoint directory when the
job failed earlier with Kafka Brokers getting messed up? We have the
checkpoint directory in our hdfs.

On Mon, Nov 9, 2015 at 12:34 PM, Cody Koeninger <c...@koeninger.org> wrote:

> I don't think deleting the checkpoint directory is a good way to restart
> the streaming job, you should stop the spark context or at the very least
> kill the driver process, then restart.
>
> On Mon, Nov 9, 2015 at 2:03 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> Our job is our failsafe as we don't have Control over Kafka Stream as of
>> now. Can setting rebalance max retries help? We do not have any monitors
>> setup as of now. We need to setup the monitors.
>>
>> My idea is to to have some kind of Cron job that queries the Streaming
>> API for monitoring like every 5 minutes and then send an email alert and
>> automatically restart the Streaming job by deleting the Checkpoint
>> directory. Would that help?
>>
>>
>>
>> Thanks!
>>
>> On Mon, Nov 9, 2015 at 11:09 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> The direct stream will fail the task if there is a problem with the
>>> kafka broker.  Spark will retry failed tasks automatically, which should
>>> handle broker rebalances that happen in a timely fashion.
>>> spark.tax.maxFailures controls the maximum number of retries before failing
>>> the job.  Direct stream isn't any different from any other spark task in
>>> that regard.
>>>
>>> The question of what kind of monitoring you need is more a question for
>>> your particular infrastructure and what you're already using for
>>> monitoring.  We put all metrics (application level or system level) into
>>> graphite and alert from there.
>>>
>>> I will say that if you've regularly got problems with kafka falling over
>>> for half an hour, I'd look at fixing that before worrying about spark
>>> monitoring...
>>>
>>>
>>> On Mon, Nov 9, 2015 at 12:26 PM, swetha <swethakasire...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> How to recover Kafka Direct automatically when the there is a problem
>>>> with
>>>> Kafka brokers? Sometimes our Kafka Brokers gets messed up and the entire
>>>> Streaming job blows up unlike some other consumers which do recover
>>>> automatically. How can I make sure that Kafka Direct recovers
>>>> automatically
>>>> when the broker fails for sometime say 30 minutes? What kind of monitors
>>>> should be in place to recover the job?
>>>>
>>>> Thanks,
>>>> Swetha
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Direct-does-not-recover-automatically-when-the-Kafka-Stream-gets-messed-up-tp25331.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-09 Thread swetha kasireddy
I store some metrics and the RDD which is the output of updateStateByKey in
my checkpoint directory. Will retest and check for the error that I get.
But,  it's mostly the StackOverFlowError that I get. So, increasing the
Stack size might help?

On Mon, Nov 9, 2015 at 12:45 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Without knowing more about what's being stored in your checkpoint
> directory / what the log output is, it's hard to say.  But either way, just
> deleting the checkpoint directory probably isn't sufficient to restart the
> job...
>
> On Mon, Nov 9, 2015 at 2:40 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> OK. But, one thing that I observed is that when there is a problem with
>> Kafka Stream, unless I delete the checkpoint directory the Streaming job
>> does not restart. I guess it tries to retry the failed tasks and if it's
>> not able to recover, it fails again. Sometimes, it fails with StackOverFlow
>> Error.
>>
>> Why does the Streaming job not restart from checkpoint directory when the
>> job failed earlier with Kafka Brokers getting messed up? We have the
>> checkpoint directory in our hdfs.
>>
>> On Mon, Nov 9, 2015 at 12:34 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> I don't think deleting the checkpoint directory is a good way to restart
>>> the streaming job, you should stop the spark context or at the very least
>>> kill the driver process, then restart.
>>>
>>> On Mon, Nov 9, 2015 at 2:03 PM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
>>>> Hi Cody,
>>>>
>>>> Our job is our failsafe as we don't have Control over Kafka Stream as
>>>> of now. Can setting rebalance max retries help? We do not have any monitors
>>>> setup as of now. We need to setup the monitors.
>>>>
>>>> My idea is to to have some kind of Cron job that queries the Streaming
>>>> API for monitoring like every 5 minutes and then send an email alert and
>>>> automatically restart the Streaming job by deleting the Checkpoint
>>>> directory. Would that help?
>>>>
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> On Mon, Nov 9, 2015 at 11:09 AM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> The direct stream will fail the task if there is a problem with the
>>>>> kafka broker.  Spark will retry failed tasks automatically, which should
>>>>> handle broker rebalances that happen in a timely fashion.
>>>>> spark.tax.maxFailures controls the maximum number of retries before 
>>>>> failing
>>>>> the job.  Direct stream isn't any different from any other spark task in
>>>>> that regard.
>>>>>
>>>>> The question of what kind of monitoring you need is more a question
>>>>> for your particular infrastructure and what you're already using for
>>>>> monitoring.  We put all metrics (application level or system level) into
>>>>> graphite and alert from there.
>>>>>
>>>>> I will say that if you've regularly got problems with kafka falling
>>>>> over for half an hour, I'd look at fixing that before worrying about spark
>>>>> monitoring...
>>>>>
>>>>>
>>>>> On Mon, Nov 9, 2015 at 12:26 PM, swetha <swethakasire...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> How to recover Kafka Direct automatically when the there is a problem
>>>>>> with
>>>>>> Kafka brokers? Sometimes our Kafka Brokers gets messed up and the
>>>>>> entire
>>>>>> Streaming job blows up unlike some other consumers which do recover
>>>>>> automatically. How can I make sure that Kafka Direct recovers
>>>>>> automatically
>>>>>> when the broker fails for sometime say 30 minutes? What kind of
>>>>>> monitors
>>>>>> should be in place to recover the job?
>>>>>>
>>>>>> Thanks,
>>>>>> Swetha
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Direct-does-not-recover-automatically-when-the-Kafka-Stream-gets-messed-up-tp25331.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Spark IndexedRDD dependency in Maven

2015-11-09 Thread swetha

Hi ,

What is the appropriate dependency to include for Spark Indexed RDD? I get
compilation error if I include 0.3 as the version as shown below:


amplab
spark-indexedrdd
0.3


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-IndexedRDD-dependency-in-Maven-tp25332.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unwanted SysOuts in Spark Parquet

2015-11-08 Thread swetha
Hi,

I see a lot of unwanted SysOuts when I try to save an RDD as parquet file.
Following is the code and 
SysOuts. Any idea as to how to avoid the unwanted SysOuts?


ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])

AvroParquetOutputFormat.setSchema(job, ActiveSession.SCHEMA$)
activeSessionsToBeSaved.saveAsNewAPIHadoopFile("test", classOf[Void],
classOf[ActiveSession],
  classOf[ParquetOutputFormat[ActiveSession]], job.getConfiguration)

Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.codec.CodecConfig: Compression
set to false
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.codec.CodecConfig: Compression:
UNCOMPRESSED
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
block size to 134217728
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
page size to 1048576
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
dictionary page size to 1048576
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Dictionary
is on
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Validation
is off
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Writer
version is: PARQUET_1_0
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.InternalParquetRecordWriter:
Flushing mem columnStore to file. allocated memory: 29,159,377
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.codec.CodecConfig: Compression
set to false
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.codec.CodecConfig: Compression:
UNCOMPRESSED
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
block size to 134217728
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
page size to 1048576
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Parquet
dictionary page size to 1048576
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Dictionary
is on
Nov 8, 2015 11:35:33 PM INFO: parquet.hadoop.ParquetOutputFormat: Validation
is off



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unwanted-SysOuts-in-Spark-Parquet-tp25325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



parquet.io.ParquetEncodingException Warning when trying to save parquet file in Spark

2015-11-08 Thread swetha
Hi,

I see unwanted Warning when I try to save a Parquet file in hdfs in Spark.
Please find below the code and the Warning message. Any idea as to how to
avoid the unwanted Warning message?

activeSessionsToBeSaved.saveAsNewAPIHadoopFile("test", classOf[Void],
classOf[ActiveSession],
  classOf[ParquetOutputFormat[ActiveSession]], job.getConfiguration)

Nov 8, 2015 11:35:39 PM WARNING: parquet.hadoop.ParquetOutputCommitter:
could not write summary file for active_sessions_current
parquet.io.ParquetEncodingException:
maprfs:/user/testId/active_sessions_current/part-r-00142.parquet invalid:
all the files must be contained in the root active_sessions_current
at
parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:422)
at
parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:398)
at
parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:51)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1056)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:998)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parquet-io-ParquetEncodingException-Warning-when-trying-to-save-parquet-file-in-Spark-tp25326.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What is the efficient way to Join two RDDs?

2015-11-06 Thread swetha
Hi,

What is the efficient way to join two RDDs? Would converting both the RDDs
to IndexedRDDs be of any help?

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-efficient-way-to-Join-two-RDDs-tp25310.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What is the efficient way to Join two RDDs?

2015-11-06 Thread swetha kasireddy
I think they are roughly of equal size.

On Fri, Nov 6, 2015 at 3:45 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you tell us a bit more about your use case ?
>
> Are the two RDDs expected to be of roughly equal size or, to be of vastly
> different sizes ?
>
> Thanks
>
> On Fri, Nov 6, 2015 at 3:21 PM, swetha <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> What is the efficient way to join two RDDs? Would converting both the RDDs
>> to IndexedRDDs be of any help?
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-efficient-way-to-Join-two-RDDs-tp25310.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: creating a distributed index

2015-11-06 Thread swetha kasireddy
Hi Ankur,

I have the following questions on IndexedRDD.

1.  Does the IndexedRDD support the key types of String? As per the current
documentation, it looks like it supports only Long?

2. Is IndexedRDD efficient when joined with another RDD. So, basically my
usecase  is that I need to create an IndexedRDD for a certain set of data
and then get those keys that are present in the IndexedRDD but not present
in some other RDD.
How would an IndexedRDD support such an usecase in an efficient manner?


Thanks,
Swetha







On Wed, Jul 15, 2015 at 2:46 AM, Jem Tucker <jem.tuc...@gmail.com> wrote:

> This is very interesting, do you know if this version will be backwards
> compatible with older versions of Spark (1.2.0)?
>
> Thanks,
>
> Jem
>
>
> On Wed, Jul 15, 2015 at 10:04 AM Ankur Dave <ankurd...@gmail.com> wrote:
>
>> The latest version of IndexedRDD supports any key type with a defined
>> serializer
>> <https://github.com/amplab/spark-indexedrdd/blob/master/src/main/scala/edu/berkeley/cs/amplab/spark/indexedrdd/KeySerializer.scala>,
>> including Strings. It's not released yet, but you can use it from the
>> master branch if you're interested.
>>
>> Ankur <http://www.ankurdave.com/>
>>
>> On Wed, Jul 15, 2015 at 12:43 AM, Jem Tucker <jem.tuc...@gmail.com>
>> wrote:
>>
>>> With regards to Indexed structures in Spark are there any alternatives
>>> to IndexedRDD for more generic keys including Strings?
>>>
>>> Thanks
>>>
>>> Jem
>>>
>>


Re: How to unpersist a DStream in Spark Streaming

2015-11-05 Thread swetha kasireddy
Its just in the same thread for a particular RDD, I need to uncache it
every 2 minutes to clear out the data that is present in a Map inside that.

On Wed, Nov 4, 2015 at 11:54 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> Hi Swetha,
>
> Would you mind elaborating your usage scenario of DStream unpersisting?
>
> From my understanding:
>
> 1. Spark Streaming will automatically unpersist outdated data (you already
> mentioned about the configurations).
> 2. If streaming job is started, I think you may lose the control of the
> job, when do you call this unpersist, how to call this unpersist (from
> another thread)?
>
> Thanks
> Saisai
>
>
> On Thu, Nov 5, 2015 at 3:13 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Other than setting the following.
>>
>> sparkConf.set("spark.streaming.unpersist", "true")
>> sparkConf.set("spark.cleaner.ttl", "7200s")
>>
>>
>> On Wed, Nov 4, 2015 at 5:03 PM, swetha <swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> How to unpersist a DStream in Spark Streaming? I know that we can persist
>>> using dStream.persist() or dStream.cache. But, I don't see any method to
>>> unPersist.
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Efficient approach to store an RDD as a file in HDFS and read it back as an RDD?

2015-11-05 Thread swetha kasireddy
I am not looking for Spark Sql specifically. My usecase is that I need to
save an RDD as a parquet file in hdfs at the end of a batch and load it
back and convert it into an RDD in the next batch. The RDD has a String and
a Long as the key/value pairs.

On Wed, Nov 4, 2015 at 11:52 PM, Stefano Baghino <
stefano.bagh...@radicalbit.io> wrote:

> What scenario would you like to optimize for? If you have something more
> specific regarding your use case, the mailing list can surely provide you
> with some very good advice.
>
> If you just want to save an RDD as Avro you can use a module from
> Databricks (the README on GitHub
> <https://github.com/databricks/spark-avro> also gives you some example),
> otherwise Parquet is natively supported by Spark SQL, the official
> documentation contains useful examples
> <http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files>
> .
>
> On Thu, Nov 5, 2015 at 12:09 AM, swetha <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> What is the efficient approach to save an RDD as a file in HDFS and
>> retrieve
>> it back? I was thinking between Avro, Parquet and SequenceFileFormart. We
>> currently use SequenceFileFormart for one of our use cases.
>>
>> Any example on how to store and retrieve an RDD in an Avro and Parquet
>> file
>> formats would be of great help.
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-approach-to-store-an-RDD-as-a-file-in-HDFS-and-read-it-back-as-an-RDD-tp25279.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>


Re: Efficient approach to store an RDD as a file in HDFS and read it back as an RDD?

2015-11-05 Thread swetha kasireddy
How to convert a parquet file that is saved in hdfs to an RDD after reading
the file from hdfs?

On Thu, Nov 5, 2015 at 10:02 AM, Igor Berman <igor.ber...@gmail.com> wrote:

> Hi,
> we are using avro with compression(snappy). As soon as you have enough
> partitions, the saving won't be a problem imho.
> in general hdfs is pretty fast, s3 is less so
> the issue with storing data is that you will loose your partitioner(even
> though rdd has it) at loading moment. There is PR that tries to solve this.
>
>
> On 5 November 2015 at 01:09, swetha <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> What is the efficient approach to save an RDD as a file in HDFS and
>> retrieve
>> it back? I was thinking between Avro, Parquet and SequenceFileFormart. We
>> currently use SequenceFileFormart for one of our use cases.
>>
>> Any example on how to store and retrieve an RDD in an Avro and Parquet
>> file
>> formats would be of great help.
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-approach-to-store-an-RDD-as-a-file-in-HDFS-and-read-it-back-as-an-RDD-tp25279.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Efficient approach to store an RDD as a file in HDFS and read it back as an RDD?

2015-11-05 Thread swetha kasireddy
OK. I found the following code that does that.

def readParquetRDD[T <% SpecificRecord](sc: SparkContext, parquetFile:
String)(implicit tag: ClassTag[T]): RDD[T] = {
  val jobConf= new JobConf(sc.hadoopConfiguration)
  ParquetInputFormat.setReadSupportClass(jobConf, classOf[AvroReadSupport[T]])
  sc.newAPIHadoopFile(
parquetFile,
classOf[ParquetInputFormat[T]],
classOf[Void],
tag.runtimeClass.asInstanceOf[Class[T]],
jobConf)
.map(_._2.asInstanceOf[T])
}


On Thu, Nov 5, 2015 at 2:14 PM, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> No scala. Suppose  I read the Parquet file as shown in the following. How
> would that be converted to an RDD to use it in my Spark Batch. I use Core
> Spark. I don't use Spark SQL.
>
> ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[
> AminoAcid]]) val file = sc.newAPIHadoopFile(outputDir, classOf[
> ParquetInputFormat[AminoAcid]], classOf[Void], classOf[AminoAcid], job.
> getConfiguration)
>
> On Thu, Nov 5, 2015 at 12:48 PM, Igor Berman <igor.ber...@gmail.com>
> wrote:
>
>> java/scala? I think there is everything in dataframes tutorial
>> *e.g. if u have dataframe and working from java - toJavaRDD
>> <https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrame.html#toJavaRDD()>*
>> ()
>>
>> On 5 November 2015 at 21:13, swetha kasireddy <swethakasire...@gmail.com>
>> wrote:
>>
>>> How to convert a parquet file that is saved in hdfs to an RDD after
>>> reading the file from hdfs?
>>>
>>> On Thu, Nov 5, 2015 at 10:02 AM, Igor Berman <igor.ber...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> we are using avro with compression(snappy). As soon as you have enough
>>>> partitions, the saving won't be a problem imho.
>>>> in general hdfs is pretty fast, s3 is less so
>>>> the issue with storing data is that you will loose your
>>>> partitioner(even though rdd has it) at loading moment. There is PR that
>>>> tries to solve this.
>>>>
>>>>
>>>> On 5 November 2015 at 01:09, swetha <swethakasire...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> What is the efficient approach to save an RDD as a file in HDFS and
>>>>> retrieve
>>>>> it back? I was thinking between Avro, Parquet and SequenceFileFormart.
>>>>> We
>>>>> currently use SequenceFileFormart for one of our use cases.
>>>>>
>>>>> Any example on how to store and retrieve an RDD in an Avro and Parquet
>>>>> file
>>>>> formats would be of great help.
>>>>>
>>>>> Thanks,
>>>>> Swetha
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-approach-to-store-an-RDD-as-a-file-in-HDFS-and-read-it-back-as-an-RDD-tp25279.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Efficient approach to store an RDD as a file in HDFS and read it back as an RDD?

2015-11-05 Thread swetha kasireddy
No scala. Suppose  I read the Parquet file as shown in the following. How
would that be converted to an RDD to use it in my Spark Batch. I use Core
Spark. I don't use Spark SQL.

ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[
AminoAcid]]) val file = sc.newAPIHadoopFile(outputDir, classOf[
ParquetInputFormat[AminoAcid]], classOf[Void], classOf[AminoAcid], job.
getConfiguration)

On Thu, Nov 5, 2015 at 12:48 PM, Igor Berman <igor.ber...@gmail.com> wrote:

> java/scala? I think there is everything in dataframes tutorial
> *e.g. if u have dataframe and working from java - toJavaRDD
> <https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrame.html#toJavaRDD()>*
> ()
>
> On 5 November 2015 at 21:13, swetha kasireddy <swethakasire...@gmail.com>
> wrote:
>
>> How to convert a parquet file that is saved in hdfs to an RDD after
>> reading the file from hdfs?
>>
>> On Thu, Nov 5, 2015 at 10:02 AM, Igor Berman <igor.ber...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> we are using avro with compression(snappy). As soon as you have enough
>>> partitions, the saving won't be a problem imho.
>>> in general hdfs is pretty fast, s3 is less so
>>> the issue with storing data is that you will loose your partitioner(even
>>> though rdd has it) at loading moment. There is PR that tries to solve this.
>>>
>>>
>>> On 5 November 2015 at 01:09, swetha <swethakasire...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> What is the efficient approach to save an RDD as a file in HDFS and
>>>> retrieve
>>>> it back? I was thinking between Avro, Parquet and SequenceFileFormart.
>>>> We
>>>> currently use SequenceFileFormart for one of our use cases.
>>>>
>>>> Any example on how to store and retrieve an RDD in an Avro and Parquet
>>>> file
>>>> formats would be of great help.
>>>>
>>>> Thanks,
>>>> Swetha
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-approach-to-store-an-RDD-as-a-file-in-HDFS-and-read-it-back-as-an-RDD-tp25279.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: How to lookup by a key in an RDD

2015-11-05 Thread swetha kasireddy
I read about the IndexedRDD. Is the IndexedRDD join with another RDD that
is not an IndexedRDD efficient?

On Mon, Nov 2, 2015 at 9:56 PM, Deenar Toraskar <deenar.toras...@gmail.com>
wrote:

> Swetha
>
> Currently IndexedRDD is an external library and not part of Spark Core.
> You can use it by adding a dependency and pull it in. There are plans to
> move it to Spark core tracked in
> https://issues.apache.org/jira/browse/SPARK-2365. See
> https://spark-summit.org/2015/events/indexedrdd-efficient-fine-grained-updates-for-rdds/
> and https://github.com/amplab/spark-indexedrdd
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 2 November 2015 at 23:29, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Please take a look at SPARK-2365
>>
>> On Mon, Nov 2, 2015 at 3:25 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Is Indexed RDDs released yet?
>>>
>>> Thanks,
>>> Swetha
>>>
>>> On Sun, Nov 1, 2015 at 1:21 AM, Gylfi <gy...@berkeley.edu> wrote:
>>>
>>>> Hi.
>>>>
>>>> You may want to look into Indexed RDDs
>>>> https://github.com/amplab/spark-indexedrdd
>>>>
>>>> Regards,
>>>> Gylfi.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-lookup-by-a-key-in-an-RDD-tp25243p25247.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


How to unpersist a DStream in Spark Streaming

2015-11-04 Thread swetha
Hi,

How to unpersist a DStream in Spark Streaming? I know that we can persist
using dStream.persist() or dStream.cache. But, I don't see any method to
unPersist.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >