*bump* On Thu, Apr 23, 2015 at 3:46 PM, Sourav Chandra < sourav.chan...@livestream.com> wrote:
> HI TD, > > Some observations: > > 1. If I submit the application using spark-submit tool with *client as > deploy mode* it works fine with single master and worker (driver, master > and worker are running in same machine) > 2. If I submit the application using spark-submit tool with client as > deploy mode it *crashes after some time with StackOverflowError* *single > master and 2 workers* (driver, master and 1 worker is running in same > machine, other > worker is in different machine) > *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0 > (TID 5412)* > *java.lang.StackOverflowError* > * at > java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)* > * at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)* > * at > java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)* > * at > java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)* > * at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)* > * at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* > * at > scala.collection.immutable.$colon$colon.readObject(List.scala:362)* > * at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* > * at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* > * at java.lang.reflect.Method.invoke(Method.java:606)* > * at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* > * at > scala.collection.immutable.$colon$colon.readObject(List.scala:362)* > * at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* > * at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* > * at java.lang.reflect.Method.invoke(Method.java:606)* > * at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* > * at > scala.collection.immutable.$colon$colon.readObject(List.scala:366)* > * at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* > > > 3. If I submit the application using spark-submit tool with *cluster as > deploy mode* it *crashes after some time with Out Of Memory error *with > single master and worker (driver, master and worker are running in same > machine) > > * 15/04/23 05:47:49 Executor: Exception in task 0.0 in stage 858.0 (TID > 1464)* > *java.lang.OutOfMemoryError: Java heap space* > * at > org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* > * at > org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* > * at > org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* > * at > org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* > * at > org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* > * at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* > * at > org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* > * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* > * at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* > * at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* > * at java.lang.reflect.Method.invoke(Method.java:606)* > * at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* > * at > com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:40)* > * at > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)* > * at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)* > * at > org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* > * at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* > * at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* > * at > org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)* > * at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)* > * at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* > * at java.lang.reflect.Method.invoke(Method.java:606)* > * at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* > * at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* > * at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* > * at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* > * at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* > > Thanks, > Sourav > > On Wed, Apr 22, 2015 at 11:40 PM, Tathagata Das <t...@databricks.com> > wrote: > >> It could very well be that your executor memory is not enough to store >> the state RDDs AND operate on the data. 1G per executor is quite low. >> Definitely give more memory. And have you tried increasing the number of >> partitions (specify number of partitions in updateStateByKey) ? >> >> On Wed, Apr 22, 2015 at 2:34 AM, Sourav Chandra < >> sourav.chan...@livestream.com> wrote: >> >>> Anyone? >>> >>> On Wed, Apr 22, 2015 at 12:29 PM, Sourav Chandra < >>> sourav.chan...@livestream.com> wrote: >>> >>> > Hi Olivier, >>> > >>> > *the update function is as below*: >>> > >>> > *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, >>> > Long)]) => {* >>> > * val previousCount = state.getOrElse((0L, 0L))._2* >>> > * var startValue: IConcurrentUsers = ConcurrentViewers(0)* >>> > * var currentCount = 0L* >>> > * val lastIndexOfConcurrentUsers =* >>> > * values.lastIndexWhere(_.isInstanceOf[ConcurrentViewers])* >>> > * val subList = values.slice(0, lastIndexOfConcurrentUsers)* >>> > * val currentCountFromSubList = subList.foldLeft(startValue)(_ op >>> > _).count + previousCount* >>> > * val lastConcurrentViewersCount = >>> > values(lastIndexOfConcurrentUsers).count* >>> > >>> > * if (math.abs(lastConcurrentViewersCount - >>> currentCountFromSubList) >>> > >= 1) {* >>> > * logger.error(* >>> > * s"Count using state updation $currentCountFromSubList, " +* >>> > * s"ConcurrentUsers count $lastConcurrentViewersCount" +* >>> > * s" resetting to $lastConcurrentViewersCount"* >>> > * )* >>> > * currentCount = lastConcurrentViewersCount* >>> > * }* >>> > * val remainingValuesList = values.diff(subList)* >>> > * startValue = ConcurrentViewers(currentCount)* >>> > * currentCount = remainingValuesList.foldLeft(startValue)(_ op >>> > _).count* >>> > >>> > * if (currentCount < 0) {* >>> > >>> > * logger.error(* >>> > * s"ERROR: Got new count $currentCount < 0, value:$values, >>> > state:$state, resetting to 0"* >>> > * )* >>> > * currentCount = 0* >>> > * }* >>> > * // to stop pushing subsequent 0 after receiving first 0* >>> > * if (currentCount == 0 && previousCount == 0) None* >>> > * else Some(previousCount, currentCount)* >>> > * }* >>> > >>> > *trait IConcurrentUsers {* >>> > * val count: Long* >>> > * def op(a: IConcurrentUsers): IConcurrentUsers = >>> > IConcurrentUsers.op(this, a)* >>> > *}* >>> > >>> > *object IConcurrentUsers {* >>> > * def op(a: IConcurrentUsers, b: IConcurrentUsers): IConcurrentUsers = >>> > (a, b) match {* >>> > * case (_, _: ConcurrentViewers) => * >>> > * ConcurrentViewers(b.count)* >>> > * case (_: ConcurrentViewers, _: IncrementConcurrentViewers) => * >>> > * ConcurrentViewers(a.count + b.count)* >>> > * case (_: ConcurrentViewers, _: DecrementConcurrentViewers) => * >>> > * ConcurrentViewers(a.count - b.count)* >>> > * }* >>> > *}* >>> > >>> > *case class IncrementConcurrentViewers(count: Long) extends >>> > IConcurrentUsers* >>> > *case class DecrementConcurrentViewers(count: Long) extends >>> > IConcurrentUsers* >>> > *case class ConcurrentViewers(count: Long) extends IConcurrentUsers* >>> > >>> > >>> > *also the error stack trace copied from executor logs is:* >>> > >>> > *java.lang.OutOfMemoryError: Java heap space* >>> > * at >>> > >>> org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)* >>> > * at >>> > >>> org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2564)* >>> > * at >>> > >>> org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)* >>> > * at >>> > org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)* >>> > * at >>> > >>> org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)* >>> > * at >>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* >>> > * at >>> > >>> org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)* >>> > * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method)* >>> > * at >>> > >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* >>> > * at >>> > >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* >>> > * at java.lang.reflect.Method.invoke(Method.java:601)* >>> > * at >>> > >>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* >>> > * at >>> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* >>> > * at >>> > >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* >>> > * at >>> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* >>> > * at >>> > java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)* >>> > * at >>> > >>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)* >>> > * at >>> > >>> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:236)* >>> > * at >>> > >>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)* >>> > * at >>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)* >>> > * at >>> > >>> org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)* >>> > * at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown >>> Source)* >>> > * at >>> > >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* >>> > * at java.lang.reflect.Method.invoke(Method.java:601)* >>> > * at >>> > >>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1004)* >>> > * at >>> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)* >>> > * at >>> > >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* >>> > * at >>> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* >>> > * at >>> > >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)* >>> > * at >>> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)* >>> > * at >>> > >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* >>> > * at >>> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)* >>> > *15/04/21 15:51:23 ERROR ExecutorUncaughtExceptionHandler: Uncaught >>> > exception in thread Thread[Executor task launch worker-1,5,main]* >>> > >>> > >>> > >>> > On Wed, Apr 22, 2015 at 1:32 AM, Olivier Girardot <ssab...@gmail.com> >>> > wrote: >>> > >>> >> Hi Sourav, >>> >> Can you post your updateFunc as well please ? >>> >> >>> >> Regards, >>> >> >>> >> Olivier. >>> >> >>> >> Le mar. 21 avr. 2015 à 12:48, Sourav Chandra < >>> >> sourav.chan...@livestream.com> a écrit : >>> >> >>> >>> Hi, >>> >>> >>> >>> We are building a spark streaming application which reads from kafka, >>> >>> does updateStateBykey based on the received message type and finally >>> stores >>> >>> into redis. >>> >>> >>> >>> After running for few seconds the executor process get killed by >>> >>> throwing OutOfMemory error. >>> >>> >>> >>> The code snippet is below: >>> >>> >>> >>> >>> >>> *NoOfReceiverInstances = 1* >>> >>> >>> >>> *val kafkaStreams = (1 to NoOfReceiverInstances).map(* >>> >>> * _ => KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, >>> TopicsMap)* >>> >>> *)* >>> >>> *val updateFunc = (values: Seq[IConcurrentUsers], state: >>> Option[(Long, >>> >>> Long)]) => {...}* >>> >>> >>> >>> >>> >>> >>> *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* >>> >>> >>> >>> >>> >>> >>> >>> *object RedisHelper {* >>> >>> * private val client = scredis.Redis(* >>> >>> * >>> >>> >>> ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* >>> >>> * )* >>> >>> >>> >>> * def update(**itr: Iterator[(String, (Long, Long))]) {* >>> >>> * // redis save operation* >>> >>> * }* >>> >>> >>> >>> *}* >>> >>> >>> >>> >>> >>> *Below is the spark configuration:* >>> >>> >>> >>> >>> >>> * spark.app.name <http://spark.app.name> = "XXXXXXX"* >>> >>> * spark.jars = "xxxx.jar"* >>> >>> * spark.home = "/spark-1.1.1-bin-hadoop2.4"* >>> >>> * spark.executor.memory = 1g* >>> >>> * spark.streaming.concurrentJobs = 1000* >>> >>> * spark.logConf = true* >>> >>> * spark.cleaner.ttl = 3600 //in milliseconds* >>> >>> * spark.default.parallelism = 12* >>> >>> * spark.executor.extraJavaOptions = "-Xloggc:gc.log >>> >>> -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof >>> >>> -XX:+HeapDumpOnOutOfMemoryError"* >>> >>> * spark.executor.logs.rolling.strategy = "size"* >>> >>> * spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* >>> >>> * spark.executor.logs.rolling.maxRetainedFiles = 10* >>> >>> * spark.serializer = "org.apache.spark.serializer.KryoSerializer"* >>> >>> * spark.kryo.registrator = "xxx.NoOpKryoRegistrator"* >>> >>> >>> >>> >>> >>> other configurations are below >>> >>> >>> >>> *streaming {* >>> >>> * // All streaming context related configs should come here* >>> >>> * batch-duration = "1 second"* >>> >>> * checkpoint-directory = "/tmp"* >>> >>> * checkpoint-duration = "10 seconds"* >>> >>> * slide-duration = "1 second"* >>> >>> * window-duration = "1 second"* >>> >>> * partitions-for-shuffle-task = 32* >>> >>> * }* >>> >>> * kafka {* >>> >>> * no-of-receivers = 1* >>> >>> * zookeeper-quorum = "xxxx:2181"* >>> >>> * consumer-group = "xxxxx"* >>> >>> * topic = "xxxxx:2"* >>> >>> * }* >>> >>> >>> >>> >>> We tried different combinations like >>> >>> - with spark 1.1.0 and 1.1.1. >>> >>> - by increasing executor memory >>> >>> - by changing the serialization strategy (switching between kryo and >>> >>> normal java) >>> >>> - by changing broadcast strategy (switching between http and torrent >>> >>> broadcast) >>> >>> >>> >>> >>> >>> Can anyone give any insight what we are missing here? How can we fix >>> >>> this? >>> >>> >>> >>> Due to akka version mismatch with some other libraries we cannot >>> upgrade >>> >>> the spark version. >>> >>> >>> >>> Thanks, >>> >>> -- >>> >>> >>> >>> Sourav Chandra >>> >>> >>> >>> Senior Software Engineer >>> >>> >>> >>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · >>> >>> >>> >>> sourav.chan...@livestream.com >>> >>> >>> >>> o: +91 80 4121 8723 >>> >>> >>> >>> m: +91 988 699 3746 >>> >>> >>> >>> skype: sourav.chandra >>> >>> >>> >>> Livestream >>> >>> >>> >>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, >>> 3rd >>> >>> Block, Koramangala Industrial Area, >>> >>> >>> >>> Bangalore 560034 >>> >>> >>> >>> www.livestream.com >>> >>> >>> >> >>> > >>> > >>> > -- >>> > >>> > Sourav Chandra >>> > >>> > Senior Software Engineer >>> > >>> > · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · >>> > >>> > sourav.chan...@livestream.com >>> > >>> > o: +91 80 4121 8723 >>> > >>> > m: +91 988 699 3746 >>> > >>> > skype: sourav.chandra >>> > >>> > Livestream >>> > >>> > "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd >>> > Block, Koramangala Industrial Area, >>> > >>> > Bangalore 560034 >>> > >>> > www.livestream.com >>> > >>> >>> >>> >>> -- >>> >>> Sourav Chandra >>> >>> Senior Software Engineer >>> >>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · >>> >>> sourav.chan...@livestream.com >>> >>> o: +91 80 4121 8723 >>> >>> m: +91 988 699 3746 >>> >>> skype: sourav.chandra >>> >>> Livestream >>> >>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd >>> Block, Koramangala Industrial Area, >>> >>> Bangalore 560034 >>> >>> www.livestream.com >>> >> >> > > > -- > > Sourav Chandra > > Senior Software Engineer > > · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · > > sourav.chan...@livestream.com > > o: +91 80 4121 8723 > > m: +91 988 699 3746 > > skype: sourav.chandra > > Livestream > > "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd > Block, Koramangala Industrial Area, > > Bangalore 560034 > > www.livestream.com > -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com