Re: RDD order preservation through transformations
I think it is one of the conceptual difference in Spark compare to other languages, there is no indexing in plain RDDs, This was the thread with Ankit: Yes. So order preservation can not be guaranteed in the case of failure. Also not sure if partitions are ordered. Can you get the same sequence of partitions in mapPartition? On 13 Sep 2017 19:54, "Ankit Maloo"wrote: > > Rdd are fault tolerant as it can be recomputed using DAG without storing the > intermediate RDDs. > > On 13-Sep-2017 11:16 PM, "Suzen, Mehmet" wrote: >> >> But what happens if one of the partitions fail, how fault tolerance recover >> elements in other partitions. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: compile error: No classtag available while calling RDD.zip()
Thanks for your reply! Actually, It is Ok when I use RDD.zip() like this: 1 def zipDatasets(m:Dataset[String], n:Dataset[Int])={ 2 m.sparkSession.createDataset(m.rdd.zip(n.rdd)); 3 } But in my project, the type of Dataset is designated by the caller, so I introduce X,Y: 1 def zipDatasets[X: Encoder, Y: Encoder](m:Dataset[X], n:Dataset[Y])={ 2 m.sparkSession.createDataset(m.rdd.zip(n.rdd)); 3 } It reports error because Y is unknown to the compiler, while the compiler needs ClassTag information of Y Now I have no idea to fix it. Regards, bluejoe 发件人: Anastasios Zouzias 答复:日期: 2017年9月14日 星期四 上午2:10 至: bluejoe 抄送: user 主题: Re: compile error: No classtag available while calling RDD.zip() Hi there, If it is OK with you to work with DataFrames, you can do https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructField,StructType,IntegerType, LongType} val df = sc.parallelize(Seq( (1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y") // Append "rowid" column of type Long val schema = df.schema val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false))) // Zip on RDD level val rddWithId = df.rdd.zipWithIndex // Convert back to DataFrame val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) // Show results dfZippedWithId.show Best, Anastasios On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏 wrote: Hello,Since Dataset has no zip(..) methods, so I wrote following code to zip two datasets: 1 def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: Dataset[X], n: Dataset[Y]) = { 2 val rdd = m.rdd.zip(n.rdd); 3 import spark.implicits._ 4 spark.createDataset(rdd); 5 } However, in the m.rdd.zip(…) call, compile error is reported: No ClassTag available for Y I know this error can be corrected when I declare Y as a ClassTag like this: 1 def foo[X: Encoder, Y: ClassTag](spark: SparkSession, … But this will make line 5 report a new error: Unable to find encoder for type stored in a Dataset. Now, I have no idea to solve this problem. How to declared Y as both an Encoder and a ClassTag? Many thanks! Best regards, bluejoe - To unsubscribe e-mail: user-unsubscr...@spark.apache.org -- -- Anastasios Zouzias
Re-sharded kinesis stream starts generating warnings after kinesis shard numbers were doubled
Has anyone seen the following warnings in the log after a kinesis stream has been re-sharded? com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask WARN Cannot get the shard for this ProcessTask, so duplicate KPL user records in the event of resharding will not be dropped during deaggregation of Amazon Kinesis records. com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy WARN Cannot find the shard given the shardId shardId-0599
how sequence of chained jars in spark.(driver/executor).extraClassPath matters
so let's say I have chained path in spark.driver.extraClassPath/spark.executor.extraClassPath such as /path1/*:/path2/*, and I have different versions of the same jar under those 2 directories, how spark pick the version of jar to use, from /path1/*? Thanks.
Should I use Dstream or Structured Stream to transfer data from source to sink and then back from sink to source?
Hi All, I am trying to read data from kafka, insert into Mongo and read from mongo and insert back into Kafka. I went with structured stream approach first however I believe I am making some naiver error because my map operations are not getting invoked. The pseudo code looks like this DataSet resultDataSet = jsonDataset.mapPartitions( insertIntoMongo).mapPartitions(readFromMongo); StreamingQuery query = resultDataSet.trigger(ProcesingTime(1000)).format("kafka").start(); query.awaitTermination(); The mapPartitions in this code is not getting executed. Is this because I am not calling any action on my streaming dataset? In the Dstream case, I used to call forEachRDD and it worked well. so how do I do this using structured streaming? Thanks!
Re: compile error: No classtag available while calling RDD.zip()
Hi there, If it is OK with you to work with DataFrames, you can do https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructField,StructType,IntegerType, LongType} val df = sc.parallelize(Seq( (1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y") // Append "rowid" column of type Long val schema = df.schema val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false))) // Zip on RDD level val rddWithId = df.rdd.zipWithIndex // Convert back to DataFrame val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) // Show results dfZippedWithId.show Best, Anastasios On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏wrote: > Hello,Since Dataset has no zip(..) methods, so I wrote following code to > zip two datasets: > > 1 def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: > Dataset[X], n: Dataset[Y]) = { > 2 val rdd = m.rdd.zip(n.rdd); > 3 import spark.implicits._ > 4 spark.createDataset(rdd); > 5 } > > However, in the m.rdd.zip(…) call, compile error is reported: No > ClassTag available for Y > > I know this error can be corrected when I declare Y as a ClassTag like > this: > > 1 def foo[X: Encoder, Y: ClassTag](spark: SparkSession, … > > But this will make line 5 report a new error: > Unable to find encoder for type stored in a Dataset. > > Now, I have no idea to solve this problem. How to declared Y as both an > Encoder and a ClassTag? > > Many thanks! > > Best regards, > bluejoe > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias
Re: RDD order preservation through transformations
I'm wondering why you need order preserved, we've had situations where keeping the source as an artificial field in the dataset was important and I had to run contortions to inject that (In this case the datasource had no unique key). Is this similar? On 13 September 2017 at 10:46, Suzen, Mehmetwrote: > But what happens if one of the partitions fail, how fault tolarence > recover elements in other partitions. > > On 13 Sep 2017 18:39, "Ankit Maloo" wrote: > >> AFAIK, the order of a rdd is maintained across a partition for Map >> operations. There is no way a map operation can change sequence across a >> partition as partition is local and computation happens one record at a >> time. >> >> On 13-Sep-2017 9:54 PM, "Suzen, Mehmet" wrote: >> >> I think the order has no meaning in RDDs see this post, specially zip >> methods: >> https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >>
Re: RDD order preservation through transformations
But what happens if one of the partitions fail, how fault tolarence recover elements in other partitions. On 13 Sep 2017 18:39, "Ankit Maloo"wrote: > AFAIK, the order of a rdd is maintained across a partition for Map > operations. There is no way a map operation can change sequence across a > partition as partition is local and computation happens one record at a > time. > > On 13-Sep-2017 9:54 PM, "Suzen, Mehmet" wrote: > > I think the order has no meaning in RDDs see this post, specially zip > methods: > https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
Re: RDD order preservation through transformations
AFAIK, the order of a rdd is maintained across a partition for Map operations. There is no way a map operation can change sequence across a partition as partition is local and computation happens one record at a time. On 13-Sep-2017 9:54 PM, "Suzen, Mehmet"wrote: I think the order has no meaning in RDDs see this post, specially zip methods: https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Chaining Spark Streaming Jobs
Thanks for your suggestion Vincent. Do not have much experience with akka as such. I will explore this option. On Tue, Sep 12, 2017 at 11:01 PM, vincent gromakowski < vincent.gromakow...@gmail.com> wrote: > What about chaining with akka or akka stream and the fair scheduler ? > > Le 13 sept. 2017 01:51, "Sunita Arvind"a écrit : > > Hi Michael, > > I am wondering what I am doing wrong. I get error like: > > Exception in thread "main" java.lang.IllegalArgumentException: Schema > must be specified when creating a streaming source DataFrame. If some files > already exist in the directory, then depending on the file format you may > be able to create a static DataFrame on that directory with > 'spark.read.load(directory)' and infer schema from it. > at org.apache.spark.sql.execution.datasources.DataSource. > sourceSchema(DataSource.scala:223) > at org.apache.spark.sql.execution.datasources.DataSource. > sourceInfo$lzycompute(DataSource.scala:87) > at org.apache.spark.sql.execution.datasources.DataSource. > sourceInfo(DataSource.scala:87) > at org.apache.spark.sql.execution.streaming.StreamingRelation$. > apply(StreamingRelation.scala:30) > at org.apache.spark.sql.streaming.DataStreamReader.load( > DataStreamReader.scala:125) > at org.apache.spark.sql.streaming.DataStreamReader.load( > DataStreamReader.scala:134) > at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregate > s.scala:23) > at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41) > at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce > ssorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe > thodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain. > java:144) > 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook > > > I tried specifying the schema as well. > Here is my code: > > object Aggregates { > > val aggregation= > """select sum(col1), sum(col2), id, first(name) > from enrichedtb > group by id > """.stripMargin > > def aggregator(conf:Config)={ > implicit val spark = > SparkSession.builder().appName(conf.getString("AppName")).getOrCreate() > implicit val sqlctx = spark.sqlContext > printf("Source path is" + conf.getString("source.path")) > val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // > Added this as it was complaining about schema. > val df=spark.readStream.format("parquet").option("inferSchema", > true).schema(schemadf.schema).load(conf.getString("source.path")) > df.createOrReplaceTempView("enrichedtb") > val res = spark.sql(aggregation) > > res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath")) > } > > def main(args: Array[String]): Unit = { > val mainconf = ConfigFactory.load() > val conf = mainconf.getConfig(mainconf.getString("pipeline")) > print(conf.toString) > aggregator(conf) > } > > } > > > I tried to extract schema from static read of the input path and provided it > to the readStream API. With that, I get this error: > > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222) > > While running on the EMR cluster all paths point to S3. In my laptop, they > all point to local filesystem. > > I am using Spark2.2.0 > > Appreciate your help. > > regards > > Sunita > > > On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust > wrote: > >> If you use structured streaming and the file sink, you can have a >> subsequent stream read using the file source. This will maintain exactly >> once processing even if there are hiccups or failures. >> >> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind >> wrote: >> >>> Hello Spark Experts, >>> >>> I have a design question w.r.t Spark Streaming. I have a streaming job >>> that consumes protocol buffer encoded real time logs from a Kafka cluster >>> on premise. My spark application runs on EMR (aws)
Re: RDD order preservation through transformations
I think the order has no meaning in RDDs see this post, specially zip methods: https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RDD order preservation through transformations
Hi, I'm a beginner using Spark with Scala and I'm having trouble understanding ordering in RDDs. I understand that RDDs are ordered (as they can be sorted) but that some transformations don't preserve order. How can I know which transformations preserve order and which don't? Regarding map, for instance, this StackOverflow answer says map preserves order but this answer on this ML implies it doesn't. The scaladoc doesn't say explicitely. Which is it? https://stackoverflow.com/a/31525843 http://apache-spark-user-list.1001560.n3.nabble.com/rdd-ordering-gets-scrambled-tp5062p6482.html https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD -- Johan Grande Sopra Steria for Orange _ Ce message et ses pieces jointes peuvent contenir des informations confidentielles ou privilegiees et ne doivent donc pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce message par erreur, veuillez le signaler a l'expediteur et le detruire ainsi que les pieces jointes. Les messages electroniques etant susceptibles d'alteration, Orange decline toute responsabilite si ce message a ete altere, deforme ou falsifie. Merci. This message and its attachments may contain confidential or privileged information that may be protected by law; they should not be distributed, used or copied without authorisation. If you have received this email in error, please notify the sender and delete this message and its attachments. As emails may be altered, Orange is not liable for messages that have been modified, changed or falsified. Thank you. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Minimum cost flow problem solving in Spark
You might be interested in "Maximum Flow implementation on Spark GraphX" done by a Colorado School of Mines grad student a couple of years ago. http://datascienceassn.org/2016-01-27-maximum-flow-implementation-spark-graphx From: Swapnil ShindeTo: user@spark.apache.org; d...@spark.apache.org Sent: Wednesday, September 13, 2017 9:41 AM Subject: Minimum cost flow problem solving in Spark Hello Has anyone used Spark to solve minimum cost flow problems in Spark? I am quite new to combinatorial optimization algorithms so any help or suggestions, libraries are very appreciated. Thanks Swapnil - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Minimum cost flow problem solving in Spark
Hello Has anyone used Spark to solve minimum cost flow problems in Spark? I am quite new to combinatorial optimization algorithms so any help or suggestions, libraries are very appreciated. Thanks Swapnil
HiveThriftserver does not seem to respect partitions
Hi folks, I have created a table in the following manner: CREATE EXTERNAL TABLE IF NOT EXISTS rum_beacon_partition ( list of columns ) COMMENT 'User Infomation' PARTITIONED BY (account_id String, product String, group_id String, year String, month String, day String) STORED AS PARQUET LOCATION '/stream2store/nt_tp_collation' I then ran MSCK REPAIR TABLE to generate the partition information. I think partitions got generated correctly -- here is a query and it's output: "show table extended like 'rum_beacon_partition' partition(account_id='',product='rum',group_id='',year='2017',month='09',day='12') location:ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation /account_id=/product=rum/group_id= /year=2017/month=09/day=12 However, it does appear that when I issue a SQL query, the predicates do not correctly limit the files touched: explain extended select uri from rum_beacon_partition where account_id='' and product='rum' and group_id='' and year='2017' and month='09' limit 2 Produces output that seems to indicate that every file is being touched (unless I'm misreading the output). It also crashes my filesystem so I suspect there is some truth to it. Optimized logical plan looks fine I think: == Optimized Logical Plan == | | Limit 2 | | Project [uri#16519] | | Filter (account_id#16511 = ) && (product#16512 = rum)) && (group_id#16513 = )) && (year#16514 = 2017)) && (month#16515 = 09)) | But in the physical plan it seems that a ton of files are touched (both in account and date partitions) Scan ParquetRelation[ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=16,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=17,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=18,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=19,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=20,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=21,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=22,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=23,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=24,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=25,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=26,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=27,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=28,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=29,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=30,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=31,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=06/day=01,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=06/day=02 I am hoping someone can offer debugging tips / advice on what to look for in the logs. I'm on a pretty old version of Spark (1.5.2) but this seems like something that I'm doing wrong.
compile error: No classtag available while calling RDD.zip()
Hello,Since Dataset has no zip(..) methods, so I wrote following code to zip two datasets: 1 def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: Dataset[X], n: Dataset[Y]) = { 2 val rdd = m.rdd.zip(n.rdd); 3 import spark.implicits._ 4 spark.createDataset(rdd); 5 } However, in the m.rdd.zip(…) call, compile error is reported: No ClassTag available for Y I know this error can be corrected when I declare Y as a ClassTag like this: 1 def foo[X: Encoder, Y: ClassTag](spark: SparkSession, … But this will make line 5 report a new error: Unable to find encoder for type stored in a Dataset. Now, I have no idea to solve this problem. How to declared Y as both an Encoder and a ClassTag? Many thanks! Best regards, bluejoe - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[Structured Streaming] Multiple sources best practice/recommendation
Hi, I have different files being dumped on S3, I want to ingest them and join them. What does sound better to you? Have one " directory" for all or one per file format? If I have one directory for all, can you get some metadata about the file, like its name? If multiple directory, how can I have multiple "listeners"? Thanks jg __ This electronic transmission and any documents accompanying this electronic transmission contain confidential information belonging to the sender. This information may contain confidential health information that is legally privileged. The information is intended only for the use of the individual or entity named above. The authorized recipient of this transmission is prohibited from disclosing this information to any other party unless required to do so by law or regulation and is required to delete or destroy the information after its stated need has been fulfilled. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution or the taking of any action in reliance on or regarding the contents of this electronically transmitted information is strictly prohibited. If you have received this E-mail in error, please notify the sender and delete this message immediately.
[Spark Dataframe] How can I write a correct filter so the Hive table partitions are pruned correctly
Hi Spark users, I've got an issue where I wrote a filter on a Hive table using dataframes and despite setting: spark.sql.hive.metastorePartitionPruning=true no partitions are being pruned. In short: Doing this: table.filter("partition=x or partition=y") will result in Spark fetching all partition metadata from the Hive metastore and doing the filtering after fetching the partitions. On the other hand if my filter is "simple": table.filter("partition=x ") Spark does a call to the metastore that passes along the filter and fetches just the ones it needs. Our case is where we have a lot of partitions on a table and the calls that result in all the partitions take minutes as well as causing us memory issues. Is this a bug or is there a better way of doing the filter call? Thanks, Patrick PS: Sorry for crossposting I wasn't sure if the user list was the correct place to ask and I understood to go via stackoverflow first so my question is also here in more detail: https://stackoverflow.com/questions/46152526/how-should-i-configure-spark-to-correctly-prune-hive-metastore-partitions
Re: Chaining Spark Streaming Jobs
What about chaining with akka or akka stream and the fair scheduler ? Le 13 sept. 2017 01:51, "Sunita Arvind"a écrit : Hi Michael, I am wondering what I am doing wrong. I get error like: Exception in thread "main" java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it. at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema( DataSource.scala:223) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$ lzycompute(DataSource.scala:87) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo( DataSource.scala:87) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply( StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader. load(DataStreamReader.scala:125) at org.apache.spark.sql.streaming.DataStreamReader. load(DataStreamReader.scala:134) at com.aol.persist.UplynkAggregates$.aggregator( UplynkAggregates.scala:23) at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41) at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook I tried specifying the schema as well. Here is my code: object Aggregates { val aggregation= """select sum(col1), sum(col2), id, first(name) from enrichedtb group by id """.stripMargin def aggregator(conf:Config)={ implicit val spark = SparkSession.builder().appName(conf.getString("AppName")).getOrCreate() implicit val sqlctx = spark.sqlContext printf("Source path is" + conf.getString("source.path")) val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // Added this as it was complaining about schema. val df=spark.readStream.format("parquet").option("inferSchema", true).schema(schemadf.schema).load(conf.getString("source.path")) df.createOrReplaceTempView("enrichedtb") val res = spark.sql(aggregation) res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath")) } def main(args: Array[String]): Unit = { val mainconf = ConfigFactory.load() val conf = mainconf.getConfig(mainconf.getString("pipeline")) print(conf.toString) aggregator(conf) } } I tried to extract schema from static read of the input path and provided it to the readStream API. With that, I get this error: at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222) While running on the EMR cluster all paths point to S3. In my laptop, they all point to local filesystem. I am using Spark2.2.0 Appreciate your help. regards Sunita On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust wrote: > If you use structured streaming and the file sink, you can have a > subsequent stream read using the file source. This will maintain exactly > once processing even if there are hiccups or failures. > > On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind > wrote: > >> Hello Spark Experts, >> >> I have a design question w.r.t Spark Streaming. I have a streaming job >> that consumes protocol buffer encoded real time logs from a Kafka cluster >> on premise. My spark application runs on EMR (aws) and persists data onto >> s3. Before I persist, I need to strip header and convert protobuffer to >> parquet (I use sparksql-scalapb to convert from Protobuff to >> Spark.sql.Row). I need to persist Raw logs as is. I can continue the >> enrichment on the same dataframe after persisting the raw data, however, in >> order to modularize I am planning to have a separate job which picks up the >> raw data and