Re: [Spark-Core] Long scheduling delays (1+ hour)
Hi, This has to do with your batch duration and processing time, as a rule, the batch duration should be lower than the processing time of your data. As I can see from your screenshots, your batch duration is 10 seconds but your processing time is more than a minute mostly, this adds up and you will end up having a lot of scheduling delay. Maybe see, why does it take 1 min to process 100 records and fix the logic. Also, I see you have higher number of events which takes some time lower amount of processing time. Fix the code logic and this should be fixed. Thanks & Regards Biplob Biswas On Wed, Nov 7, 2018 at 11:08 AM bsikander wrote: > We are facing an issue with very long scheduling delays in Spark (upto 1+ > hours). > We are using Spark-standalone. The data is being pulled from Kafka. > > Any help would be much appreciated. > > I have attached the screenshots. > < > http://apache-spark-user-list.1001560.n3.nabble.com/file/t8018/1-stats.png> > > <http://apache-spark-user-list.1001560.n3.nabble.com/file/t8018/4.png> > <http://apache-spark-user-list.1001560.n3.nabble.com/file/t8018/3.png> > <http://apache-spark-user-list.1001560.n3.nabble.com/file/t8018/2.png> > > > > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: [Spark Shell on AWS K8s Cluster]: Is there more documentation regarding how to run spark-shell on k8s cluster?
Hi Yuqi, Just curious can you share the spark-submit script and what are you passing as --master argument? Thanks & Regards Biplob Biswas On Wed, Oct 31, 2018 at 10:34 AM Gourav Sengupta wrote: > Just out of curiosity why would you not use Glue (which is Spark on > kubernetes) or EMR? > > Regards, > Gourav Sengupta > > On Mon, Oct 29, 2018 at 1:29 AM Zhang, Yuqi > wrote: > >> Hello guys, >> >> >> >> I am Yuqi from Teradata Tokyo. Sorry to disturb but I have some problem >> regarding using spark 2.4 client mode function on kubernetes cluster, so I >> would like to ask if there is some solution to my problem. >> >> >> >> The problem is when I am trying to run spark-shell on kubernetes v1.11.3 >> cluster on AWS environment, I couldn’t successfully run stateful set using >> the docker image built from spark 2.4. The error message is showing below. >> The version I am using is spark v2.4.0-rc3. >> >> >> >> Also, I wonder if there is more documentation on how to use client-mode >> or integrate spark-shell on kubernetes cluster. From the documentation on >> https://github.com/apache/spark/blob/v2.4.0-rc3/docs/running-on-kubernetes.md >> there is only a brief description. I understand it’s not the official >> released version yet, but If there is some more documentation, could you >> please share with me? >> >> >> >> Thank you very much for your help! >> >> >> >> >> >> Error msg: >> >> + env >> >> + sed 's/[^=]*=\(.*\)/\1/g' >> >> + sort -t_ -k4 -n >> >> + grep SPARK_JAVA_OPT_ >> >> + readarray -t SPARK_EXECUTOR_JAVA_OPTS >> >> + '[' -n '' ']' >> >> + '[' -n '' ']' >> >> + PYSPARK_ARGS= >> >> + '[' -n '' ']' >> >> + R_ARGS= >> >> + '[' -n '' ']' >> >> + '[' '' == 2 ']' >> >> + '[' '' == 3 ']' >> >> + case "$SPARK_K8S_CMD" in >> >> + CMD=("$SPARK_HOME/bin/spark-submit" --conf >> "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client >> "$@") >> >> + exec /sbin/tini -s -- /opt/spark/bin/spark-submit --conf >> spark.driver.bindAddress= --deploy-mode client >> >> Error: Missing application resource. >> >> Usage: spark-submit [options] [app >> arguments] >> >> Usage: spark-submit --kill [submission ID] --master [spark://...] >> >> Usage: spark-submit --status [submission ID] --master [spark://...] >> >> Usage: spark-submit run-example [options] example-class [example args] >> >> >> >> >> >> -- >> >> Yuqi Zhang >> >> Software Engineer >> >> m: 090-6725-6573 >> >> >> [image: signature_147554612] <http://www.teradata.com/> >> >> 2 Chome-2-23-1 Akasaka >> >> Minato, Tokyo 107-0052 >> teradata.com <http://www.teradata.com> >> >> >> This e-mail is from Teradata Corporation and may contain information that >> is confidential or proprietary. If you are not the intended recipient, do >> not read, copy or distribute the e-mail or any attachments. Instead, please >> notify the sender and delete the e-mail and any attachments. Thank you. >> >> Please consider the environment before printing. >> >> >> >> >> >
Re: unsubsribe
You need to send the email to user-unsubscr...@spark.apache.org and not to the usergroup. Thanks & Regards Biplob Biswas On Tue, Oct 30, 2018 at 10:59 AM Anu B Nair wrote: > I am sending this Unsubscribe mail for last few months! It never happens! > If anyone can help us to unsubscribe it wil be really helpful! > > On Tue, Oct 30, 2018 at 3:27 PM Mohan Palavancha < > mohan.palavan...@gmail.com> wrote: > >> >>
Re: Replacing groupBykey() with reduceByKey()
Hi Santhosh, My name is not Bipin, its Biplob as is clear from my Signature. Regarding your question, I have no clue what your map operation is doing on the grouped data, so I can only suggest you to do : dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).reduceByKey(build_edges, 25) Although based on the return type you would have to modify your build_edges function. Thanks & Regards Biplob Biswas On Mon, Aug 6, 2018 at 6:28 PM Bathi CCDB wrote: > Hey Bipin, > Thanks for the reply, I am actually aggregating after the groupByKey() > operation, > I have posted the wrong code snippet in my first email. Here is what I am > doing > > dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: > (x[0],x)).groupByKey(25).map(build_edges) > > Can we replace reduceByKey() in this context ? > > Santhosh > > > On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas > wrote: > >> Hi Santhosh, >> >> If you are not performing any aggregation, then I don't think you can >> replace your groupbykey with a reducebykey, and as I see you are only >> grouping and taking 2 values of the result, thus I believe you can't just >> replace your groupbykey with that. >> >> Thanks & Regards >> Biplob Biswas >> >> >> On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB wrote: >> >>> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark >>> and python newbie and I am having a hard time figuring out the lambda >>> function for the reduceByKey() operation. >>> >>> Here is the code >>> >>> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: >>> (x[0],x)).groupByKey(25).take(2) >>> >>> Here is the return value >>> >>> >>> dd[(u'KEY_1', >> >>> 0x107be0c50>), (u'KEY_2', >> >>> at 0x107be0c10>)] >>> >>> and Here are the iterable contents dd[0][1] >>> >>> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', >>> value=u'e7dc1f2a')Row(key=u'KEY_1', >>> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', >>> value=u'fb0bc953')...Row(key=u'KEY_1', >>> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', >>> value=u'd39714d3')Row(key=u'KEY_1', >>> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92') >>> >>> My question is how do replace with reduceByKey() and get the same >>> output as above? >>> >>> Santhosh >>> >> >
Re: Replacing groupBykey() with reduceByKey()
Hi Santhosh, If you are not performing any aggregation, then I don't think you can replace your groupbykey with a reducebykey, and as I see you are only grouping and taking 2 values of the result, thus I believe you can't just replace your groupbykey with that. Thanks & Regards Biplob Biswas On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB wrote: > I am trying to replace groupByKey() with reudceByKey(), I am a pyspark > and python newbie and I am having a hard time figuring out the lambda > function for the reduceByKey() operation. > > Here is the code > > dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: > (x[0],x)).groupByKey(25).take(2) > > Here is the return value > > >>> dd[(u'KEY_1', >>> 0x107be0c50>), (u'KEY_2', >>> at 0x107be0c10>)] > > and Here are the iterable contents dd[0][1] > > Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', > value=u'e7dc1f2a')Row(key=u'KEY_1', > hash_fn=u'f8891048a9ef8331227b4af080ecd28a', > value=u'fb0bc953')...Row(key=u'KEY_1', > hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', > value=u'd39714d3')Row(key=u'KEY_1', > hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92') > > My question is how do replace with reduceByKey() and get the same output > as above? > > Santhosh >
Re: Backpressure initial rate not working
Hi Todd, Thanks a lot, that works. Althouhg I am curious whether you know why the initialRate setting not kicking in? But for now the pipeline is usable again. Thanks a lot. Thanks & Regards Biplob Biswas On Thu, Jul 26, 2018 at 3:03 PM Todd Nist wrote: > Have you tried reducing the maxRatePerPartition to a lower value? Based > on your settings, I believe you are going to be able to pull *600K* worth > of messages from Kafka, basically: > > • maxRatePerPartition=15000 > > • batchInterval 10s > > • 4 partitions on Ingest topic > > > This results in a maximum ingest rate of 600K: > > > • 4 * 10 * 15000 = 600,000 max > > Can you reduce the maxRatePerPartition to say 1500 for a test run? That > should result in a more manageable batch and you can adjust from there. > > > • 4 * 10 * 1500 = 60,000 max > > I know we are not setting the maxRate or initialRate, only the > maxRatePerPartition and backpressure.enabled. I thought that maxRate was > not applicable when using back pressure, but may be mistaken. > > > -Todd > > > > > > > On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas > wrote: > >> Hi Todd, >> >> Thanks for the reply. I have the mayxRatePerPartition set as well. Below >> is the spark submit config we used and still got the issue. Also the *batch >> interval is set at 10s* and *number of partitions on the topic is set to >> 4* : >> >> spark2-submit --name "${YARN_NAME}" \ >>--master yarn \ >>--deploy-mode ${DEPLOY_MODE} \ >>--num-executors ${NUM_EXECUTORS} \ >>--driver-cores ${NUM_DRIVER_CORES} \ >>--executor-cores ${NUM_EXECUTOR_CORES} \ >>--driver-memory ${DRIVER_MEMORY} \ >>--executor-memory ${EXECUTOR_MEMORY} \ >>--queue ${YARN_QUEUE} \ >>--keytab ${KEYTAB}-yarn \ >>--principal ${PRINCIPAL} \ >>--conf "spark.yarn.preserve.staging.files=true" \ >>--conf "spark.yarn.submit.waitAppCompletion=false" \ >>--conf "spark.shuffle.service.enabled=true" \ >>--conf "spark.dynamicAllocation.enabled=true" \ >>--conf "spark.dynamicAllocation.minExecutors=1" \ >>--conf "spark.streaming.backpressure.enabled=true" \ >>--conf "spark.streaming.receiver.maxRate=15000" \ >>--conf "spark.streaming.kafka.maxRatePerPartition=15000" \ >>--conf "spark.streaming.backpressure.initialRate=2000" \ >>--conf >> "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" >> \ >>--driver-class-path >> "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" >> \ >>--driver-java-options "-Djava.security.auth.login.config=./jaas.conf >> -Dlog4j.configuration=log4j-spark.properties" \ >>--conf >> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf >> -Dlog4j.configuration=log4j-spark.properties" \ >>--files "${JAAS_CONF},${KEYTAB}" \ >>--class "${MAIN_CLASS}" \ >>"${ARTIFACT_FILE}" >> >> >> The first batch is huge, even if it worked for the first batch I would've >> tried researching more. The problem is that the first batch is more than >> 500k records. >> >> Thanks & Regards >> Biplob Biswas >> >> >> On Thu, Jul 26, 2018 at 2:33 PM Todd Nist wrote: >> >>> Hi Biplob, >>> >>> How many partitions are on the topic you are reading from and have you >>> set the maxRatePerPartition? iirc, spark back pressure is calculated as >>> follows: >>> >>> *Spark back pressure:* >>> >>> Back pressure is calculated off of the following: >>> >>> >>> • maxRatePerPartition=200 >>> >>> • batchInterval 30s >>> >>> • 3 partitions on Ingest topic >>> >>> >>> This results in a maximum ingest rate of 18K: >>> >>> >>> • 3 * 30 * 200 = 18 max >>> >>> The spark.streaming.backpressure.initialRate only applies to the first >>> batch, per docs: >>> >>> >>> This is the initial maximum receiving rate at which each receiver will >>>> receive data for the *first batch* when the backpressure mechanism is >>>> enabled. >>> >>> >>> If you set the maxRatePerPartition and apply the above formula, I >>> believe you will be able to achieve th
Re: Backpressure initial rate not working
Hi Todd, Thanks for the reply. I have the mayxRatePerPartition set as well. Below is the spark submit config we used and still got the issue. Also the *batch interval is set at 10s* and *number of partitions on the topic is set to 4* : spark2-submit --name "${YARN_NAME}" \ --master yarn \ --deploy-mode ${DEPLOY_MODE} \ --num-executors ${NUM_EXECUTORS} \ --driver-cores ${NUM_DRIVER_CORES} \ --executor-cores ${NUM_EXECUTOR_CORES} \ --driver-memory ${DRIVER_MEMORY} \ --executor-memory ${EXECUTOR_MEMORY} \ --queue ${YARN_QUEUE} \ --keytab ${KEYTAB}-yarn \ --principal ${PRINCIPAL} \ --conf "spark.yarn.preserve.staging.files=true" \ --conf "spark.yarn.submit.waitAppCompletion=false" \ --conf "spark.shuffle.service.enabled=true" \ --conf "spark.dynamicAllocation.enabled=true" \ --conf "spark.dynamicAllocation.minExecutors=1" \ --conf "spark.streaming.backpressure.enabled=true" \ --conf "spark.streaming.receiver.maxRate=15000" \ --conf "spark.streaming.kafka.maxRatePerPartition=15000" \ --conf "spark.streaming.backpressure.initialRate=2000" \ --conf "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \ --driver-class-path "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" \ --driver-java-options "-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf -Dlog4j.configuration=log4j-spark.properties" \ --files "${JAAS_CONF},${KEYTAB}" \ --class "${MAIN_CLASS}" \ "${ARTIFACT_FILE}" The first batch is huge, even if it worked for the first batch I would've tried researching more. The problem is that the first batch is more than 500k records. Thanks & Regards Biplob Biswas On Thu, Jul 26, 2018 at 2:33 PM Todd Nist wrote: > Hi Biplob, > > How many partitions are on the topic you are reading from and have you set > the maxRatePerPartition? iirc, spark back pressure is calculated as > follows: > > *Spark back pressure:* > > Back pressure is calculated off of the following: > > > • maxRatePerPartition=200 > > • batchInterval 30s > > • 3 partitions on Ingest topic > > > This results in a maximum ingest rate of 18K: > > > • 3 * 30 * 200 = 18 max > > The spark.streaming.backpressure.initialRate only applies to the first > batch, per docs: > > > This is the initial maximum receiving rate at which each receiver will >> receive data for the *first batch* when the backpressure mechanism is >> enabled. > > > If you set the maxRatePerPartition and apply the above formula, I believe > you will be able to achieve the results you are looking for. > > HTH. > > -Todd > > > On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas > wrote: > >> Did anyone face similar issue? and any viable way to solve this? >> Thanks & Regards >> Biplob Biswas >> >> >> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas >> wrote: >> >>> I have enabled the spark.streaming.backpressure.enabled setting and also >>> set spark.streaming.backpressure.initialRate to 15000, but my spark >>> job is not respecting these settings when reading from Kafka after a >>> failure. >>> >>> In my kafka topic around 500k records are waiting for being processed >>> and they are all taken in 1 huge batch which ultimately takes a long time >>> and fails with executor failure exception. We don't have more resources to >>> give in our test cluster and we expect the backpressure to kick in and take >>> smaller batches. >>> >>> What can I be doing wrong? >>> >>> >>> Thanks & Regards >>> Biplob Biswas >>> >>
Re: Backpressure initial rate not working
Did anyone face similar issue? and any viable way to solve this? Thanks & Regards Biplob Biswas On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas wrote: > I have enabled the spark.streaming.backpressure.enabled setting and also > set spark.streaming.backpressure.initialRate to 15000, but my spark job > is not respecting these settings when reading from Kafka after a failure. > > In my kafka topic around 500k records are waiting for being processed and > they are all taken in 1 huge batch which ultimately takes a long time and > fails with executor failure exception. We don't have more resources to give > in our test cluster and we expect the backpressure to kick in and take > smaller batches. > > What can I be doing wrong? > > > Thanks & Regards > Biplob Biswas >
Backpressure initial rate not working
I have enabled the spark.streaming.backpressure.enabled setting and also set spark.streaming.backpressure.initialRate to 15000, but my spark job is not respecting these settings when reading from Kafka after a failure. In my kafka topic around 500k records are waiting for being processed and they are all taken in 1 huge batch which ultimately takes a long time and fails with executor failure exception. We don't have more resources to give in our test cluster and we expect the backpressure to kick in and take smaller batches. What can I be doing wrong? Thanks & Regards Biplob Biswas
Re: Using newApiHadoopRDD for reading from HBase
Can someone please help me out here, maybe point to some documentation for the same? I couldn't find almost anything. Thanks & Regards Biplob Biswas On Thu, Jun 28, 2018 at 11:13 AM Biplob Biswas wrote: > Hi, > > I had a few questions regarding the way *newApiHadoopRDD *accesses data > from HBase. > > 1. Does it load all the data from a scan operation directly in memory? > 2. According to my understanding, the data is loaded from different > regions to different executors, is that assumption/understanding correct? > 3. If it does load all the data from the scan operation, what happens when > the data size is more than executor memory? > 4. What happens when we have a huge number of column qualifiers for a > given row ? > > > Thanks & Regards > Biplob Biswas >
Using newApiHadoopRDD for reading from HBase
Hi, I had a few questions regarding the way *newApiHadoopRDD *accesses data from HBase. 1. Does it load all the data from a scan operation directly in memory? 2. According to my understanding, the data is loaded from different regions to different executors, is that assumption/understanding correct? 3. If it does load all the data from the scan operation, what happens when the data size is more than executor memory? 4. What happens when we have a huge number of column qualifiers for a given row ? Thanks & Regards Biplob Biswas
Spark Jobs ends when assignment not found for Kafka Partition
) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 18/05/12 03:53:02 INFO scheduler.JobGenerator: Stopped JobGenerator 18/05/12 03:53:02 INFO scheduler.JobScheduler: Stopped JobScheduler 18/05/12 03:53:02 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@56a3af9e{/streaming,null,UNAVAILABLE,@Spark} 18/05/12 03:53:02 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@4ca2cc2b{/streaming/batch,null,UNAVAILABLE,@Spark} 18/05/12 03:53:02 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@39330226{/static/streaming,null,UNAVAILABLE,@Spark} 18/05/12 03:53:02 INFO streaming.StreamingContext: StreamingContext stopped successfully 18/05/12 03:53:02 INFO spark.SparkContext: Invoking stop() from shutdown hook 18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed broadcast_537_piece0 on 10.2.0.12:46563 in memory (size: 4.6 KB, free: 93.3 MB) 18/05/12 03:53:02 INFO server.AbstractConnector: Stopped Spark@45ad8de8{HTTP/1.1,[http/1.1]}{0.0.0.0:0} 18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed broadcast_537_piece0 on cdh2.germanycentral.cloudapp.microsoftazure.de:36878 in memory (size: 4.6 KB, free: 93.3 MB) 18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed broadcast_537_piece0 on cdh3.germanycentral.cloudapp.microsoftazure.de:42605 in memory (size: 4.6 KB, free: 93.3 MB) 18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed broadcast_537_piece0 on cdh2.germanycentral.cloudapp.microsoftazure.de:36740 in memory (size: 4.6 KB, free: 93.3 MB) 18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed broadcast_537_piece0 on cdh2.germanycentral.cloudapp.microsoftazure.de:34228 in memory (size: 4.6 KB, free: 93.3 MB) 18/05/12 03:53:02 INFO ui.SparkUI: Stopped Spark web UI at http://10.2.0.12:37118 18/05/12 03:53:02 INFO cluster.YarnClusterSchedulerBackend: Shutting down all executors 18/05/12 03:53:02 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down 18/05/12 03:53:02 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices (serviceOption=None, services=List(), started=false) 18/05/12 03:53:02 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 18/05/12 03:53:02 INFO memory.MemoryStore: MemoryStore cleared 18/05/12 03:53:02 INFO storage.BlockManager: BlockManager stopped 18/05/12 03:53:02 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 18/05/12 03:53:02 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/05/12 03:53:02 INFO spark.SparkContext: Successfully stopped SparkContext 18/05/12 03:53:02 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: java.lang.IllegalStateException: No current assignment for partition iomkafkaconnector-deliverydata-dev-2) 18/05/12 03:53:02 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. 18/05/12 03:53:02 INFO util.ShutdownHookManager: Shutdown hook called Thanks & Regards Biplob Biswas
Re: Prefer Structured Streaming over Spark Streaming (DStreams)?
Great to hear 2 different viewpoints, and thanks a lot for your input Michael. For now, our application perform an etl process where it reads data from kafka and stores it in HBase and then performs basic enhancement and pushes data out on a kafka topic. We have a conflict of opinion here as few people want to go with DStreams stating that it provides the primitive rdd abstraction and functionality is better and easier than structured streaming. We don't have any event time requirement and also not using windowing mechanism, some basic grouping, enhancement and storing. Thats why the question was directed towards Structured Streaming vs DStreams. Also, when you say, > Structured streaming is a completely new implementation that does not use > DStreams at all, but instead directly runs jobs using RDDs I understand it doesn't it doesn't use Dstream but I thought Structured Streaming runs jobs on RDD's via dataframes and in the future, if the RDD abstraction needs to be switched, it will be done by removing RDD with something else. Please correct me if I understood this wrong. Thanks & Regards Biplob Biswas On Thu, Feb 1, 2018 at 12:12 AM, Michael Armbrust <mich...@databricks.com> wrote: > At this point I recommend that new applications are built using structured > streaming. The engine was GA-ed as of Spark 2.2 and I know of several very > large (trillions of records) production jobs that are running in Structured > Streaming. All of our production pipelines at databricks are written using > structured streaming as well. > > Regarding the comparison with RDDs: The situation here is different than > when thinking about batch DataFrames vs. RDDs. DataFrames are "just" a > higher-level abstraction on RDDs. Structured streaming is a completely new > implementation that does not use DStreams at all, but instead directly runs > jobs using RDDs. The advantages over DStreams include: > - The ability to start and stop individual queries (rather than needing > to start/stop a separate StreamingContext) > - The ability to upgrade your stream and still start from an existing > checkpoint > - Support for working with Spark SQL data sources (json, parquet, etc) > - Higher level APIs (DataFrames and SQL) and lambda functions (Datasets) > - Support for event time aggregation > > At this point, with the addition of mapGroupsWithState and > flatMapGroupsWithState, I think we should be at feature parity with > DStreams (and the state store does incremental checkpoints that are more > efficient than the DStream store). However if there are applications you > are having a hard time porting over, please let us know! > > On Wed, Jan 31, 2018 at 5:42 AM, vijay.bvp <bvpsa...@gmail.com> wrote: > >> here is my two cents, experts please correct me if wrong >> >> its important to understand why one over other and for what kind of use >> case. There might be sometime in future where low level API's are >> abstracted >> and become legacy but for now in Spark RDD API is the core and low level >> API, all higher APIs translate to RDD ultimately, and RDD's are >> immutable. >> >> https://spark.apache.org/docs/latest/structured-streaming-pr >> ogramming-guide.html#unsupported-operations >> these are things that are not supported and this list needs to be >> validated >> with the use case you have. >> >> From my experience Structured Streaming is still new and DStreams API is a >> matured API. >> some things that are missing or need to explore more. >> >> watermarking/windowing based on no of records in a particular window >> >> assuming you have watermark and windowing on event time of the data, the >> resultant dataframe is grouped data set, only thing you can do is run >> aggregate functions. you can't simply use that output as another dataframe >> and manipulate. There is a custom aggregator but I feel its limited. >> >> https://spark.apache.org/docs/latest/structured-streaming-pr >> ogramming-guide.html#arbitrary-stateful-operations >> There is option to do stateful operations, using GroupState where the >> function gets iterator of events for that window. This is the closest >> access >> to StateStore a developer could get. >> This arbitrary state that programmer could keep across invocations has its >> limitations as such how much state we could keep?, is that state stored in >> driver memory? What happens if the spark job fails is this checkpointed or >> restored? >> >> thanks >> Vijay >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >
Prefer Structured Streaming over Spark Streaming (DStreams)?
Hi, I read an article which recommended to use dataframes instead of rdd primitives. Now I read about the differences over using DStreams and Structured Streaming and structured streaming adds a lot of improvements like checkpointing, windowing, sessioning, fault tolerance etc. What I am interested to know is, if I have to start a new project is there any reason to prefer structured streaming over Dstreams? One argument is that the engine is abstracted with structured streaming and one can change the micro-batching engine to something like the continuous processing engine. Apart from that is there any special reason? Would there be a point in time when the DStreams and RDD would become obsolete?
Conflict resolution for data in spark streaming
Hi, I have a situation where updates are coming from 2 different data sources, this data at times are arriving in the same batch defined in streaming context duration parameter of 500 ms (recommended in spark according to the documentation). Now that is not the problem, the problem is that when the data is partitioned to different executors, the order in which it originally arrived, it's not processed in the same order, this I know because the event data which comes last should be used for the updated state. This kind of race condition exists and is not consistent. Has anyone any idea to fix this issue? I am not really sure if anyone faced this kind of any issue and if someone fixed anything like this? Thanks & Regards Biplob Biswas
Re: Spark Streaming - Increasing number of executors slows down processing rate
Hi Edwin, I have faced a similar issue as well and this behaviour is very abrupt. I even created a question on StackOverflow but no solution yet. https://stackoverflow.com/questions/43496205/spark-job-processing-time-increases-to-4s-without-explanation For us, we sometimes had this constant delay of 4s (which increases to 8s if we increase executors) whenever we started the job. But then we observed something which you can see in the question above. The processing time increases abruptly. I read a lot about similar issues but always it was recommended that something else is causing this delay. Although I am not really sure it feels its some issue with kafka - spark integration but can't say for sure. Regards, Biplob Thanks & Regards Biplob Biswas On Tue, Jun 20, 2017 at 5:42 AM, Mal Edwin <mal.ed...@vinadionline.com> wrote: > Hi All, > > I am struggling with an odd issue and would like your help in addressing > it. > > > *Environment* > > AWS Cluster (40 Spark Nodes & 4 node Kafka cluster) > > Spark Kafka Streaming submitted in Yarn cluster mode > > Kafka - Single topic, 400 partitions > > Spark 2.1 on Cloudera > > Kafka 10.0 on Cloudera > > > We have zero messages in Kafka and starting this spark job with 100 > Executors each with 14GB of RAM and single executor core. > > The time to process 0 records(end of each batch) is 5seconds > > > When we increase the executors to 400 and everything else remains the same > except we reduce memory to 11GB, we see the time to process 0 records(end > of each batch) increases 10times to 50Second and some cases it goes to 103 > seconds. > > > Spark Streaming configs that we are setting are > > Batchwindow = 60 seconds > > Backpressure.enabled = true > > spark.memory.fraction=0.3 (we store more data in our own data structures) > > spark.streaming.kafka.consumer.poll.ms=1 > > > Have tried increasing driver memory to 4GB and also increased driver.cores > to 4. > > > If anybody has faced similar issues please provide some pointers to how to > address this issue. > > > Thanks a lot for your time. > > > Regards, > > Edwin > >
[Spark Structured Streaming] Exception while using watermark with type of timestamp
Hi, I am playing around with Spark structured streaming and we have a use case to use this as a CEP engine. I am reading from 3 different kafka topics together. I want to perform windowing on this structured stream and then run some queries on this block on a sliding scale. Also, all of this needs to happen on the event time and I have my corresponding timestamp attached to each event. Now I have something like this /val windowedEvents = filteredBamEvents.withWatermark("timestamp", "10 minutes") .groupBy(functions.window($"timestamp", "10 minutes", "5 minutes"))/ where timestamp is of type long in my case class as follows: /case class BAMIngestedEvent(id: String, eventName: String, eventID: String, correlationID: Seq[String], timestamp: Long)/ But when I am running this example with my data from kafka I cam getting this following exception: Exception in thread "main" org.apache.spark.sql.AnalysisException: Event time must be defined on a window or a timestamp, but timestamp is of type bigint;; EventTimeWatermark timestamp#36: bigint, interval 10 minutes +- TypedFilter , class com.airplus.poc.edl.model.BAMIngestedEvent, [StructField(id,StringType,true), StructField(eventName,StringType,true), StructField(eventID,StringType,true), StructField(correlationID,ArrayType(StringType,true),true), StructField(timestamp,LongType,false)], newInstance(class com.airplus.poc.edl.model.BAMIngestedEvent) +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).id, true) AS id#32, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).eventName, true) AS eventName#33, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).eventID, true) AS eventID#34, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.String), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.String)), true), assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).correlationID) AS correlationID#35, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).timestamp AS timestamp#36L] +- MapElements , interface org.apache.spark.sql.Row, [StructField(value,StringType,true)], obj#31: com.airplus.poc.edl.model.BAMIngestedEvent +- DeserializeToObject createexternalrow(value#16.toString, StructField(value,StringType,true)), obj#30: org.apache.spark.sql.Row +- Project [value#16] +- Project [cast(key#0 as string) AS key#15, cast(value#1 as string) AS value#16] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@222acad,kafka,List(),None,List(),None,Map(startingOffsets -> latest, subscribe -> iom, edl, kafka.bootstrap.servers -> airpluspoc-hdp-dn0.germanycentral.cloudapp.microsoftazure.de:9092,airpluspoc-hdp-dn1.germanycentral.cloudapp.microsoftazure.de:9092,airpluspoc-hdp-dn2.germanycentral.cloudapp.microsoftazure.de:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:204) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52) at org.apache.spark.sql.Dataset.(Dataset.scala:161) at org.apache.spark.sql.Dataset.(Dataset.scala:167) at org.apache.spark.sql.Dataset$.apply(Dataset.scala:58) at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2850) at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:571) at com.airplus.poc.edl.CEPForBAM$.main(CEPForBAM.scala:47) at com.airplus.poc.edl.CEPForBAM.main(CEPForBAM.scala) Regards, Biplob -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Structured-Streaming-Exception-while-using-watermark-with-type-of-timestamp-tp28746.html Sent from the Apache Spark User List mailing
[Spark Structured Streaming] Exception while using watermark with type of timestamp
Hi, I am playing around with Spark structured streaming and we have a use case to use this as a CEP engine. I am reading from 3 different kafka topics together. I want to perform windowing on this structured stream and then run some queries on this block on a sliding scale. Also, all of this needs to happen on the event time and I have my corresponding timestamp attached to each event. Now I have something like this /val windowedEvents = filteredBamEvents.withWatermark("timestamp", "10 minutes") .groupBy(functions.window($"timestamp", "10 minutes", "5 minutes"))/ where timestamp is of type long in my case class as follows: /case class BAMIngestedEvent(id: String, eventName: String, eventID: String, correlationID: Seq[String], timestamp: Long)/ But when I am running this example with my data from kafka I cam getting this following exception: Exception in thread "main" org.apache.spark.sql.AnalysisException: Event time must be defined on a window or a timestamp, but timestamp is of type bigint;; EventTimeWatermark timestamp#36: bigint, interval 10 minutes +- TypedFilter , class com.airplus.poc.edl.model.BAMIngestedEvent, [StructField(id,StringType,true), StructField(eventName,StringType,true), StructField(eventID,StringType,true), StructField(correlationID,ArrayType(StringType,true),true), StructField(timestamp,LongType,false)], newInstance(class com.airplus.poc.edl.model.BAMIngestedEvent) +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).id, true) AS id#32, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).eventName, true) AS eventName#33, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).eventID, true) AS eventID#34, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.String), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.String)), true), assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).correlationID) AS correlationID#35, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).timestamp AS timestamp#36L] +- MapElements , interface org.apache.spark.sql.Row, [StructField(value,StringType,true)], obj#31: com.airplus.poc.edl.model.BAMIngestedEvent +- DeserializeToObject createexternalrow(value#16.toString, StructField(value,StringType,true)), obj#30: org.apache.spark.sql.Row +- Project [value#16] +- Project [cast(key#0 as string) AS key#15, cast(value#1 as string) AS value#16] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@222acad,kafka,List(),None,List(),None,Map(startingOffsets -> latest, subscribe -> iom, edl, kafka.bootstrap.servers -> airpluspoc-hdp-dn0.germanycentral.cloudapp.microsoftazure.de:9092,airpluspoc-hdp-dn1.germanycentral.cloudapp.microsoftazure.de:9092,airpluspoc-hdp-dn2.germanycentral.cloudapp.microsoftazure.de:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:204) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52) at org.apache.spark.sql.Dataset.(Dataset.scala:161) at org.apache.spark.sql.Dataset.(Dataset.scala:167) at org.apache.spark.sql.Dataset$.apply(Dataset.scala:58) at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2850) at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:571) at com.airplus.poc.edl.CEPForBAM$.main(CEPForBAM.scala:47) at com.airplus.poc.edl.CEPForBAM.main(CEPForBAM.scala) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Structured-Streaming-Exception-while-using-watermark-with-type-of-timestamp-tp28745.html Sent from the Apache Spark User List mailing list archive at
Re: StreamingKmeans Spark doesn't work at all
Hi Shuai, Thanks for the reply, I mentioned in the mail that I tried running the scala example as well from the link I provided and the result is the same. Thanks & Regards Biplob Biswas On Mon, Jul 11, 2016 at 5:52 AM, Shuai Lin <linshuai2...@gmail.com> wrote: > I would suggest you run the scala version of the example first, so you can > tell whether it's a problem of the data you provided or a problem of the > java code. > > On Mon, Jul 11, 2016 at 2:37 AM, Biplob Biswas <revolutioni...@gmail.com> > wrote: > >> Hi, >> >> I know i am asking again, but I tried running the same thing on mac as >> well as some answers on the internet suggested it could be an issue with >> the windows environment, but still nothing works. >> >> Can anyone atleast suggest whether its a bug with spark or is it >> something else? >> >> Would be really grateful! Thanks a lot. >> >> Thanks & Regards >> Biplob Biswas >> >> On Thu, Jul 7, 2016 at 5:21 PM, Biplob Biswas <revolutioni...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Can anyone care to please look into this issue? I would really love >>> some assistance here. >>> >>> Thanks a lot. >>> >>> Thanks & Regards >>> Biplob Biswas >>> >>> On Tue, Jul 5, 2016 at 1:00 PM, Biplob Biswas <revolutioni...@gmail.com> >>> wrote: >>> >>>> >>>> Hi, >>>> >>>> I implemented the streamingKmeans example provided in the spark website >>>> but >>>> in Java. >>>> The full implementation is here, >>>> >>>> http://pastebin.com/CJQfWNvk >>>> >>>> But i am not getting anything in the output except occasional timestamps >>>> like one below: >>>> >>>> --- >>>> Time: 1466176935000 ms >>>> --- >>>> >>>> Also, i have 2 directories: >>>> "D:\spark\streaming example\Data Sets\training" >>>> "D:\spark\streaming example\Data Sets\test" >>>> >>>> and inside these directories i have 1 file each >>>> "samplegpsdata_train.txt" >>>> and "samplegpsdata_test.txt" with training data having 500 datapoints >>>> and >>>> test data with 60 datapoints. >>>> >>>> I am very new to the spark systems and any help is highly appreciated. >>>> >>>> >>>> //---// >>>> >>>> Now, I also have now tried using the scala implementation available >>>> here: >>>> >>>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala >>>> >>>> >>>> and even had the training and test file provided in the format >>>> specified in >>>> that file as follows: >>>> >>>> * The rows of the training text files must be vector data in the form >>>> * `[x1,x2,x3,...,xn]` >>>> * Where n is the number of dimensions. >>>> * >>>> * The rows of the test text files must be labeled data in the form >>>> * `(y,[x1,x2,x3,...,xn])` >>>> * Where y is some identifier. n must be the same for train and test. >>>> >>>> >>>> But I still get no output on my eclipse window ... just the Time! >>>> >>>> Can anyone seriously help me with this? >>>> >>>> Thank you so much >>>> Biplob Biswas >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKmeans-Spark-doesn-t-work-at-all-tp27286.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> >>>> >>> >> >
Re: StreamingKmeans Spark doesn't work at all
Hi, I know i am asking again, but I tried running the same thing on mac as well as some answers on the internet suggested it could be an issue with the windows environment, but still nothing works. Can anyone atleast suggest whether its a bug with spark or is it something else? Would be really grateful! Thanks a lot. Thanks & Regards Biplob Biswas On Thu, Jul 7, 2016 at 5:21 PM, Biplob Biswas <revolutioni...@gmail.com> wrote: > Hi, > > Can anyone care to please look into this issue? I would really love some > assistance here. > > Thanks a lot. > > Thanks & Regards > Biplob Biswas > > On Tue, Jul 5, 2016 at 1:00 PM, Biplob Biswas <revolutioni...@gmail.com> > wrote: > >> >> Hi, >> >> I implemented the streamingKmeans example provided in the spark website >> but >> in Java. >> The full implementation is here, >> >> http://pastebin.com/CJQfWNvk >> >> But i am not getting anything in the output except occasional timestamps >> like one below: >> >> --- >> Time: 1466176935000 ms >> --- >> >> Also, i have 2 directories: >> "D:\spark\streaming example\Data Sets\training" >> "D:\spark\streaming example\Data Sets\test" >> >> and inside these directories i have 1 file each "samplegpsdata_train.txt" >> and "samplegpsdata_test.txt" with training data having 500 datapoints and >> test data with 60 datapoints. >> >> I am very new to the spark systems and any help is highly appreciated. >> >> >> //---// >> >> Now, I also have now tried using the scala implementation available here: >> >> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala >> >> >> and even had the training and test file provided in the format specified >> in >> that file as follows: >> >> * The rows of the training text files must be vector data in the form >> * `[x1,x2,x3,...,xn]` >> * Where n is the number of dimensions. >> * >> * The rows of the test text files must be labeled data in the form >> * `(y,[x1,x2,x3,...,xn])` >> * Where y is some identifier. n must be the same for train and test. >> >> >> But I still get no output on my eclipse window ... just the Time! >> >> Can anyone seriously help me with this? >> >> Thank you so much >> Biplob Biswas >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKmeans-Spark-doesn-t-work-at-all-tp27286.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >
Re: StreamingKmeans Spark doesn't work at all
Hi, Can anyone care to please look into this issue? I would really love some assistance here. Thanks a lot. Thanks & Regards Biplob Biswas On Tue, Jul 5, 2016 at 1:00 PM, Biplob Biswas <revolutioni...@gmail.com> wrote: > > Hi, > > I implemented the streamingKmeans example provided in the spark website but > in Java. > The full implementation is here, > > http://pastebin.com/CJQfWNvk > > But i am not getting anything in the output except occasional timestamps > like one below: > > --- > Time: 1466176935000 ms > --- > > Also, i have 2 directories: > "D:\spark\streaming example\Data Sets\training" > "D:\spark\streaming example\Data Sets\test" > > and inside these directories i have 1 file each "samplegpsdata_train.txt" > and "samplegpsdata_test.txt" with training data having 500 datapoints and > test data with 60 datapoints. > > I am very new to the spark systems and any help is highly appreciated. > > > //---// > > Now, I also have now tried using the scala implementation available here: > > https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala > > > and even had the training and test file provided in the format specified in > that file as follows: > > * The rows of the training text files must be vector data in the form > * `[x1,x2,x3,...,xn]` > * Where n is the number of dimensions. > * > * The rows of the test text files must be labeled data in the form > * `(y,[x1,x2,x3,...,xn])` > * Where y is some identifier. n must be the same for train and test. > > > But I still get no output on my eclipse window ... just the Time! > > Can anyone seriously help me with this? > > Thank you so much > Biplob Biswas > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKmeans-Spark-doesn-t-work-at-all-tp27286.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
StreamingKmeans Spark doesn't work at all
Hi, I implemented the streamingKmeans example provided in the spark website but in Java. The full implementation is here, http://pastebin.com/CJQfWNvk But i am not getting anything in the output except occasional timestamps like one below: --- Time: 1466176935000 ms --- Also, i have 2 directories: "D:\spark\streaming example\Data Sets\training" "D:\spark\streaming example\Data Sets\test" and inside these directories i have 1 file each "samplegpsdata_train.txt" and "samplegpsdata_test.txt" with training data having 500 datapoints and test data with 60 datapoints. I am very new to the spark systems and any help is highly appreciated. //---// Now, I also have now tried using the scala implementation available here: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala and even had the training and test file provided in the format specified in that file as follows: * The rows of the training text files must be vector data in the form * `[x1,x2,x3,...,xn]` * Where n is the number of dimensions. * * The rows of the test text files must be labeled data in the form * `(y,[x1,x2,x3,...,xn])` * Where y is some identifier. n must be the same for train and test. But I still get no output on my eclipse window ... just the Time! Can anyone seriously help me with this? Thank you so much Biplob Biswas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKmeans-Spark-doesn-t-work-at-all-tp27286.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Working of Streaming Kmeans
Hi, Can anyone please explain this? Thanks & Regards Biplob Biswas On Sat, Jul 2, 2016 at 4:48 PM, Biplob Biswas <revolutioni...@gmail.com> wrote: > Hi, > > I wanted to ask a very basic question about the working of Streaming > Kmeans. > > Does the model update only when training (i.e. training dataset is used) or > does it update on the PredictOnValues function as well for the test > dataset? > > Thanks and Regards > Biplob > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Working-of-Streaming-Kmeans-tp27268.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Working of Streaming Kmeans
Hi, I wanted to ask a very basic question about the working of Streaming Kmeans. Does the model update only when training (i.e. training dataset is used) or does it update on the PredictOnValues function as well for the test dataset? Thanks and Regards Biplob -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Working-of-Streaming-Kmeans-tp27268.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Running JavaBased Implementation of StreamingKmeans Spark
Hi, Something is wrong with my spark subscription so I can't see the responses properly on nabble, so I subscribed from a different id, hopefully it is solved and I am putting my question again here. I implemented the streamingKmeans example provided in the spark website but in Java. The full implementation is here, http://pastebin.com/CJQfWNvk But i am not getting anything in the output except occasional timestamps like one below: --- Time: 1466176935000 ms --- Also, i have 2 directories: "D:\spark\streaming example\Data Sets\training" "D:\spark\streaming example\Data Sets\test" and inside these directories i have 1 file each "samplegpsdata_train.txt" and "samplegpsdata_test.txt" with training data having 500 datapoints and test data with 60 datapoints. I have already tried putting the data files after starting the code but still no output, i am also not getting any exception or anything, so its hard to debug for me. I am very new to the spark systems and any help is highly appreciated. Thank you so much Biplob Biswas
Re: Running JavaBased Implementation of StreamingKmeans Spark
Hi, I tried doing that but even then I couldn't see any results. I started the program and added the files later. Thanks & Regards Biplob Biswas On Sat, Jun 25, 2016 at 2:19 AM, Jayant Shekhar <jayantbaya...@gmail.com> wrote: > Hi Biplop, > > Can you try adding new files to the training/test directories after you > have started your streaming application! Especially the test directory as > you are printing your predictions. > > On Fri, Jun 24, 2016 at 2:32 PM, Biplob Biswas <revolutioni...@gmail.com> > wrote: > >> >> Hi, >> >> I implemented the streamingKmeans example provided in the spark website >> but >> in Java. >> The full implementation is here, >> >> http://pastebin.com/CJQfWNvk >> >> But i am not getting anything in the output except occasional timestamps >> like one below: >> >> --- >> Time: 1466176935000 ms >> --- >> >> Also, i have 2 directories: >> "D:\spark\streaming example\Data Sets\training" >> "D:\spark\streaming example\Data Sets\test" >> >> and inside these directories i have 1 file each "samplegpsdata_train.txt" >> and "samplegpsdata_test.txt" with training data having 500 datapoints and >> test data with 60 datapoints. >> >> I am very new to the spark systems and any help is highly appreciated. >> >> Thank you so much >> Biplob Biswas >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Running-JavaBased-Implementation-of-StreamingKmeans-Spark-tp27225.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Running JavaBased Implementation of StreamingKmeans Spark
Hi, I implemented the streamingKmeans example provided in the spark website but in Java. The full implementation is here, http://pastebin.com/CJQfWNvk But i am not getting anything in the output except occasional timestamps like one below: --- Time: 1466176935000 ms --- Also, i have 2 directories: "D:\spark\streaming example\Data Sets\training" "D:\spark\streaming example\Data Sets\test" and inside these directories i have 1 file each "samplegpsdata_train.txt" and "samplegpsdata_test.txt" with training data having 500 datapoints and test data with 60 datapoints. I am very new to the spark systems and any help is highly appreciated. Thank you so much Biplob Biswas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-JavaBased-Implementation-of-StreamingKmeans-Spark-tp27225.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Running JavaBased Implementation of StreamingKmeans Spark
Hi, I implemented the streamingKmeans example provided in the spark website but in Java. The full implementation is here, http://pastebin.com/CJQfWNvk But i am not getting anything in the output except occasional timestamps like one below: --- Time: 1466176935000 ms --- Also, i have 2 directories: "D:\spark\streaming example\Data Sets\training" "D:\spark\streaming example\Data Sets\test" and inside these directories i have 1 file each "samplegpsdata_train.txt" and "samplegpsdata_test.txt" with training data having 500 datapoints and test data with 60 datapoints. I am very new to the spark systems and any help is highly appreciated. Thank you so much Biplob Biswas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-JavaBased-Implementation-of-StreamingKmeans-Spark-tp27206.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running JavaBased Implementationof StreamingKmeans
Hi, Can someone please look into this and tell me whats wrong?and why am I not getting any output? Thanks & Regards Biplob Biswas On Sun, Jun 19, 2016 at 1:29 PM, Biplob Biswas <revolutioni...@gmail.com> wrote: > Hi, > > Thanks for that input, I tried doing that but apparently thats not working > as well. I thought i am having problems with my spark installation so I ran > simple word count and that works, so I am not really sure what the problem > is now. > > Is my translation of the scala code correct? I don't understand the syntax > of scala very well thus wrote my own implementation of streaming kmeans in > java, so i am hoping thats correct. > > Thanks & Regards > Biplob Biswas > > On Sun, Jun 19, 2016 at 3:23 AM, Akhil Das <ak...@hacked.work> wrote: > >> SparkStreaming does not pick up old files by default, so you need to >> start your job with master=local[2] (It needs 2 or more working threads, 1 >> to read the files and the other to do your computation) and once the job >> start to run, place your input files in the input directories and you can >> see them being picked up by sparkstreaming. >> >> On Sun, Jun 19, 2016 at 12:37 AM, Biplob Biswas <revolutioni...@gmail.com >> > wrote: >> >>> Hi, >>> >>> I tried local[*] and local[2] and the result is the same. I don't really >>> understand the problem here. >>> How can I confirm that the files are read properly? >>> >>> Thanks & Regards >>> Biplob Biswas >>> >>> On Sat, Jun 18, 2016 at 5:59 PM, Akhil Das <ak...@hacked.work> wrote: >>> >>>> Looks like you need to set your master to local[2] or local[*] >>>> >>>> On Sat, Jun 18, 2016 at 4:54 PM, Biplob Biswas < >>>> revolutioni...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I implemented the streamingKmeans example provided in the spark >>>>> website but >>>>> in Java. >>>>> The full implementation is here, >>>>> >>>>> http://pastebin.com/CJQfWNvk >>>>> >>>>> But i am not getting anything in the output except occasional >>>>> timestamps >>>>> like one below: >>>>> >>>>> --- >>>>> Time: 1466176935000 ms >>>>> --- >>>>> >>>>> Also, i have 2 directories: >>>>> "D:\spark\streaming example\Data Sets\training" >>>>> "D:\spark\streaming example\Data Sets\test" >>>>> >>>>> and inside these directories i have 1 file each >>>>> "samplegpsdata_train.txt" >>>>> and "samplegpsdata_test.txt" with training data having 500 datapoints >>>>> and >>>>> test data with 60 datapoints. >>>>> >>>>> I am very new to the spark systems and any help is highly appreciated. >>>>> >>>>> Thank you so much >>>>> Biplob Biswas >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Running-JavaBased-Implementationof-StreamingKmeans-tp27192.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> - >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>>> >>>> >>>> -- >>>> Cheers! >>>> >>>> >>> >> >> >> -- >> Cheers! >> >> >
Re: Running JavaBased Implementationof StreamingKmeans
Hi, Thanks for that input, I tried doing that but apparently thats not working as well. I thought i am having problems with my spark installation so I ran simple word count and that works, so I am not really sure what the problem is now. Is my translation of the scala code correct? I don't understand the syntax of scala very well thus wrote my own implementation of streaming kmeans in java, so i am hoping thats correct. Thanks & Regards Biplob Biswas On Sun, Jun 19, 2016 at 3:23 AM, Akhil Das <ak...@hacked.work> wrote: > SparkStreaming does not pick up old files by default, so you need to start > your job with master=local[2] (It needs 2 or more working threads, 1 to > read the files and the other to do your computation) and once the job start > to run, place your input files in the input directories and you can see > them being picked up by sparkstreaming. > > On Sun, Jun 19, 2016 at 12:37 AM, Biplob Biswas <revolutioni...@gmail.com> > wrote: > >> Hi, >> >> I tried local[*] and local[2] and the result is the same. I don't really >> understand the problem here. >> How can I confirm that the files are read properly? >> >> Thanks & Regards >> Biplob Biswas >> >> On Sat, Jun 18, 2016 at 5:59 PM, Akhil Das <ak...@hacked.work> wrote: >> >>> Looks like you need to set your master to local[2] or local[*] >>> >>> On Sat, Jun 18, 2016 at 4:54 PM, Biplob Biswas <revolutioni...@gmail.com >>> > wrote: >>> >>>> Hi, >>>> >>>> I implemented the streamingKmeans example provided in the spark website >>>> but >>>> in Java. >>>> The full implementation is here, >>>> >>>> http://pastebin.com/CJQfWNvk >>>> >>>> But i am not getting anything in the output except occasional timestamps >>>> like one below: >>>> >>>> --- >>>> Time: 1466176935000 ms >>>> --- >>>> >>>> Also, i have 2 directories: >>>> "D:\spark\streaming example\Data Sets\training" >>>> "D:\spark\streaming example\Data Sets\test" >>>> >>>> and inside these directories i have 1 file each >>>> "samplegpsdata_train.txt" >>>> and "samplegpsdata_test.txt" with training data having 500 datapoints >>>> and >>>> test data with 60 datapoints. >>>> >>>> I am very new to the spark systems and any help is highly appreciated. >>>> >>>> Thank you so much >>>> Biplob Biswas >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Running-JavaBased-Implementationof-StreamingKmeans-tp27192.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >>> >>> -- >>> Cheers! >>> >>> >> > > > -- > Cheers! > >
Re: Running JavaBased Implementationof StreamingKmeans
Hi, I tried local[*] and local[2] and the result is the same. I don't really understand the problem here. How can I confirm that the files are read properly? Thanks & Regards Biplob Biswas On Sat, Jun 18, 2016 at 5:59 PM, Akhil Das <ak...@hacked.work> wrote: > Looks like you need to set your master to local[2] or local[*] > > On Sat, Jun 18, 2016 at 4:54 PM, Biplob Biswas <revolutioni...@gmail.com> > wrote: > >> Hi, >> >> I implemented the streamingKmeans example provided in the spark website >> but >> in Java. >> The full implementation is here, >> >> http://pastebin.com/CJQfWNvk >> >> But i am not getting anything in the output except occasional timestamps >> like one below: >> >> --- >> Time: 1466176935000 ms >> --- >> >> Also, i have 2 directories: >> "D:\spark\streaming example\Data Sets\training" >> "D:\spark\streaming example\Data Sets\test" >> >> and inside these directories i have 1 file each "samplegpsdata_train.txt" >> and "samplegpsdata_test.txt" with training data having 500 datapoints and >> test data with 60 datapoints. >> >> I am very new to the spark systems and any help is highly appreciated. >> >> Thank you so much >> Biplob Biswas >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Running-JavaBased-Implementationof-StreamingKmeans-tp27192.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > > > -- > Cheers! > >
Running JavaBased Implementation of StreamingKmeans
Hi, I implemented the streamingKmeans example provided in the spark website but in Java. The full implementation is here, http://pastebin.com/CJQfWNvk But i am not getting anything in the output except occasional timestamps like one below: --- Time: 1466176935000 ms --- Also, i have 2 directories: "D:\spark\streaming example\Data Sets\training" "D:\spark\streaming example\Data Sets\test" and inside these directories i have 1 file each "samplegpsdata_train.txt" and "samplegpsdata_test.txt" with training data having 500 datapoints and test data with 60 datapoints. I am very new to the spark systems and any help is highly appreciated. Thanks & Regards Biplob Biswas
Running JavaBased Implementationof StreamingKmeans
Hi, I implemented the streamingKmeans example provided in the spark website but in Java. The full implementation is here, http://pastebin.com/CJQfWNvk But i am not getting anything in the output except occasional timestamps like one below: --- Time: 1466176935000 ms --- Also, i have 2 directories: "D:\spark\streaming example\Data Sets\training" "D:\spark\streaming example\Data Sets\test" and inside these directories i have 1 file each "samplegpsdata_train.txt" and "samplegpsdata_test.txt" with training data having 500 datapoints and test data with 60 datapoints. I am very new to the spark systems and any help is highly appreciated. Thank you so much Biplob Biswas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-JavaBased-Implementationof-StreamingKmeans-tp27190.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Running Java Implementationof StreamingKmeans
Hi, I implemented the streamingKmeans example provided in the spark website but in Java. The full implementation is here, http://pastebin.com/CJQfWNvk But i am not getting anything in the output except occasional timestamps like one below: --- Time: 1466176935000 ms --- Also, i have 2 directories: "D:\spark\streaming example\Data Sets\training" "D:\spark\streaming example\Data Sets\test" and inside these directories i have 1 file each "samplegpsdata_train.txt" and "samplegpsdata_test.txt" with training data having 500 datapoints and test data with 60 datapoints. I am very new to the spark systems and any help is highly appreciated. Thank you so much Biplob Biswas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Java-Implementationof-StreamingKmeans-tp27188.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Running Java Implementationof StreamingKmeans
Hi, I implemented the streamingKmeans example provided in the spark website but in Java. The full implementation is here, http://pastebin.com/CJQfWNvk But i am not getting anything in the output except occasional timestamps like one below: --- Time: 1466176935000 ms --- Also, i have 2 directories: "D:\spark\streaming example\Data Sets\training" "D:\spark\streaming example\Data Sets\test" and inside these directories i have 1 file each "samplegpsdata_train.txt" and "samplegpsdata_test.txt" with training data having 500 datapoints and test data with 60 datapoints. I am very new to the spark systems and any help is highly appreciated. Thank you so much Biplob Biswas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Java-Implementationof-StreamingKmeans-tp27187.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org