Re: disable spark disk cache
Hi Andrey, Below is the description of MEMORY_ONLY from https://spark.apache.org/docs/latest/rdd-programming-guide.html "Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level." Just curious how do you know Spark will be disk even option MEMORY_ONLY is chosen? Cheers, Hien On Sun, Mar 3, 2019 at 1:47 PM Andrey Dudin wrote: > Hello everyone, > > Is there a way to prevent caching data to disk even if the memory(RAM) > runs out? > As I know, spark will use disk even if I use MEMORY_ONLY. How to disable > this mechanism? I want to get something like out of memory exception if the > memory(RAM) runs out. > > > Thanks, > Andrey > -- Regards,
Re: to_avro and from_avro not working with struct type in spark 2.4
Thanks for the answer. As far as the next step goes, I am thinking of writing out the dfKV dataframe to disk and then use Avro apis to read the data. This smells like a bug somewhere. Cheers, Hien On Thu, Feb 28, 2019 at 4:02 AM Gabor Somogyi wrote: > No, just take a look at the schema of dfStruct since you've converted its > value column with to_avro: > > scala> dfStruct.printSchema > root > |-- id: integer (nullable = false) > |-- name: string (nullable = true) > |-- age: integer (nullable = false) > |-- value: struct (nullable = false) > ||-- name: string (nullable = true) > ||-- age: integer (nullable = false) > > > On Wed, Feb 27, 2019 at 6:51 PM Hien Luu wrote: > >> Thanks for looking into this. Does this mean string fields should alway >> be nullable? >> >> You are right that the result is not yet correct and further digging is >> needed :( >> >> On Wed, Feb 27, 2019 at 1:19 AM Gabor Somogyi >> wrote: >> >>> Hi, >>> >>> I was dealing with avro stuff lately and most of the time it has >>> something to do with the schema. >>> One thing I've pinpointed quickly (where I was struggling also) is the >>> name field should be nullable but the result is not yet correct so further >>> digging needed... >>> >>> scala> val expectedSchema = StructType(Seq(StructField("name", >>> StringType,true),StructField("age", IntegerType, false))) >>> expectedSchema: org.apache.spark.sql.types.StructType = >>> StructType(StructField(name,StringType,true), >>> StructField(age,IntegerType,false)) >>> >>> scala> val avroTypeStruct = >>> SchemaConverters.toAvroType(expectedSchema).toString >>> avroTypeStruct: String = >>> {"type":"record","name":"topLevelRecord","fields":[{"name":"name","type":["string","null"]},{"name":"age","type":"int"}]} >>> >>> scala> dfKV.select(from_avro('value, avroTypeStruct)).show >>> +-+ >>> |from_avro(value, struct)| >>> +-+ >>> | [Mary Jane, 25]| >>> | [Mary Jane, 25]| >>> +-+ >>> >>> BR, >>> G >>> >>> >>> On Wed, Feb 27, 2019 at 7:43 AM Hien Luu wrote: >>> >>>> Hi, >>>> >>>> I ran into a pretty weird issue with to_avro and from_avro where it was >>>> not >>>> able to parse the data in a struct correctly. Please see the simple and >>>> self contained example below. I am using Spark 2.4. I am not sure if I >>>> missed something. >>>> >>>> This is how I start the spark-shell on my Mac: >>>> >>>> ./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.0 >>>> >>>> import org.apache.spark.sql.types._ >>>> import org.apache.spark.sql.avro._ >>>> import org.apache.spark.sql.functions._ >>>> >>>> >>>> spark.version >>>> >>>> val df = Seq((1, "John Doe", 30), (2, "Mary Jane", 25)).toDF("id", >>>> "name", >>>> "age") >>>> >>>> val dfStruct = df.withColumn("value", struct("name","age")) >>>> >>>> dfStruct.show >>>> dfStruct.printSchema >>>> >>>> val dfKV = dfStruct.select(to_avro('id).as("key"), >>>> to_avro('value).as("value")) >>>> >>>> val expectedSchema = StructType(Seq(StructField("name", StringType, >>>> false),StructField("age", IntegerType, false))) >>>> >>>> val avroTypeStruct = >>>> SchemaConverters.toAvroType(expectedSchema).toString >>>> >>>> val avroTypeStr = s""" >>>> |{ >>>> | "type": "int", >>>> | "name": "key" >>>> |} >>>> """.stripMargin >>>> >>>> >>>> dfKV.select(from_avro('key, avroTypeStr)).show >>>> >>>> // output >>>> +---+ >>>> |from_avro(key, int)| >>>> +---+ >>>> | 1| >>>> | 2| >>>> +---+ >>>> >>>> dfKV.select(from_avro('value, avroTypeStruct)).show >>>> >>>> // output >>>> +-+ >>>> |from_avro(value, struct)| >>>> +-+ >>>> |[, 9]| >>>> |[, 9]| >>>> +-+ >>>> >>>> Please help and thanks in advance. >>>> >>>> >>>> >>>> >>>> -- >>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >>>> >>>> - >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> >>>> >> >> -- >> Regards, >> > -- Regards,
Re: to_avro and from_avro not working with struct type in spark 2.4
Thanks for looking into this. Does this mean string fields should alway be nullable? You are right that the result is not yet correct and further digging is needed :( On Wed, Feb 27, 2019 at 1:19 AM Gabor Somogyi wrote: > Hi, > > I was dealing with avro stuff lately and most of the time it has something > to do with the schema. > One thing I've pinpointed quickly (where I was struggling also) is the > name field should be nullable but the result is not yet correct so further > digging needed... > > scala> val expectedSchema = StructType(Seq(StructField("name", > StringType,true),StructField("age", IntegerType, false))) > expectedSchema: org.apache.spark.sql.types.StructType = > StructType(StructField(name,StringType,true), > StructField(age,IntegerType,false)) > > scala> val avroTypeStruct = > SchemaConverters.toAvroType(expectedSchema).toString > avroTypeStruct: String = > {"type":"record","name":"topLevelRecord","fields":[{"name":"name","type":["string","null"]},{"name":"age","type":"int"}]} > > scala> dfKV.select(from_avro('value, avroTypeStruct)).show > +-+ > |from_avro(value, struct)| > +-----+ > | [Mary Jane, 25]| > | [Mary Jane, 25]| > +-+ > > BR, > G > > > On Wed, Feb 27, 2019 at 7:43 AM Hien Luu wrote: > >> Hi, >> >> I ran into a pretty weird issue with to_avro and from_avro where it was >> not >> able to parse the data in a struct correctly. Please see the simple and >> self contained example below. I am using Spark 2.4. I am not sure if I >> missed something. >> >> This is how I start the spark-shell on my Mac: >> >> ./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.0 >> >> import org.apache.spark.sql.types._ >> import org.apache.spark.sql.avro._ >> import org.apache.spark.sql.functions._ >> >> >> spark.version >> >> val df = Seq((1, "John Doe", 30), (2, "Mary Jane", 25)).toDF("id", >> "name", >> "age") >> >> val dfStruct = df.withColumn("value", struct("name","age")) >> >> dfStruct.show >> dfStruct.printSchema >> >> val dfKV = dfStruct.select(to_avro('id).as("key"), >> to_avro('value).as("value")) >> >> val expectedSchema = StructType(Seq(StructField("name", StringType, >> false),StructField("age", IntegerType, false))) >> >> val avroTypeStruct = SchemaConverters.toAvroType(expectedSchema).toString >> >> val avroTypeStr = s""" >> |{ >> | "type": "int", >> | "name": "key" >> |} >> """.stripMargin >> >> >> dfKV.select(from_avro('key, avroTypeStr)).show >> >> // output >> +---+ >> |from_avro(key, int)| >> +---+ >> | 1| >> | 2| >> +---+ >> >> dfKV.select(from_avro('value, avroTypeStruct)).show >> >> // output >> +-+ >> |from_avro(value, struct)| >> +-+ >> |[, 9]| >> |[, 9]| >> +-+ >> >> Please help and thanks in advance. >> >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> -- Regards,
to_avro and from_avro not working with struct type in spark 2.4
Hi, I ran into a pretty weird issue with to_avro and from_avro where it was not able to parse the data in a struct correctly. Please see the simple and self contained example below. I am using Spark 2.4. I am not sure if I missed something. This is how I start the spark-shell on my Mac: ./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.0 import org.apache.spark.sql.types._ import org.apache.spark.sql.avro._ import org.apache.spark.sql.functions._ spark.version val df = Seq((1, "John Doe", 30), (2, "Mary Jane", 25)).toDF("id", "name", "age") val dfStruct = df.withColumn("value", struct("name","age")) dfStruct.show dfStruct.printSchema val dfKV = dfStruct.select(to_avro('id).as("key"), to_avro('value).as("value")) val expectedSchema = StructType(Seq(StructField("name", StringType, false),StructField("age", IntegerType, false))) val avroTypeStruct = SchemaConverters.toAvroType(expectedSchema).toString val avroTypeStr = s""" |{ | "type": "int", | "name": "key" |} """.stripMargin dfKV.select(from_avro('key, avroTypeStr)).show // output +---+ |from_avro(key, int)| +---+ | 1| | 2| +---+ dfKV.select(from_avro('value, avroTypeStruct)).show // output +-+ |from_avro(value, struct)| +-+ |[, 9]| |[, 9]| +-+ Please help and thanks in advance. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: How to register custom structured streaming source
Hi Farshid, Take a look at this example on github - https://github.com/hienluu/structured-streaming-sources. Cheers, Hien On Thu, Jul 12, 2018 at 12:52 AM Farshid Zavareh wrote: > Hello. > > I need to create a custom streaming source by extending *FileStreamSource*. > The idea is to override *commit*, so that processed files (S3 objects in > my case) are renamed to have a certain prefix. However, I don't know how to > use this custom source. Obviously I don't want to compile Spark -- the > application will be running on Amazon EMR clusters. > > Thanks, > Farshid > -- Regards,
Re: Writing custom Structured Streaming receiver
Finally got a toy version of Structured Streaming DataSource V2 version with Apache Spark 2.3 working. Tested locally and on Databricks community edition. Source code is here - https://github.com/hienluu/wikiedit-streaming -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Writing custom Structured Streaming receiver
I finally got around to implement a custom structured streaming receiver (source) to read Wikipedia edit events from the IRC server. It works fines locally as well as in spark-shell on my laptop. However, it failed with the following exception when running in Databricks community edition. It seems like there is no way to create DataFrame with isStreaming as true if a class resides outside of spark.sql package. I was looking at RateSourceProvider class that comes with Apache Spark and it is using an internal function to create a DataFrame with isStreaming as true - sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) Does anyone have any suggestion? (other than waiting for DataSourceV2) assertion failed: DataFrame returned by getBatch from did not have isStreaming=true org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$11.apply(StreamExecution.scala:674) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$11.apply(StreamExecution.scala:669) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:669) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:669) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:62) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:668) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:328) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:316) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:316) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:62) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:316) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:312) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226) -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RE: How to convert Array of Json rows into Dataset of specific columns in Spark 2.2.0?
Hi Kant, I am not sure whether you had come up with a solution yet, but the following works for me (in Scala) val emp_info = """ [ {"name": "foo", "address": {"state": "CA", "country": "USA"}, "docs":[{"subject": "english", "year": 2016}]}, {"name": "bar", "address": {"state": "OH", "country": "USA"}, "docs":[{"subject": "math", "year": 2017}]} ]""" import org.apache.spark.sql.types._ val addressSchema = new StructType().add("state", StringType).add("country", StringType) val docsSchema = ArrayType(new StructType().add("subject", StringType).add("year", IntegerType)) val employeeSchema = new StructType().add("name", StringType).add("address", addressSchema).add("docs", docsSchema) val empInfoSchema = ArrayType(employeeSchema) empInfoSchema.json val empInfoStrDF = Seq((emp_info)).toDF("emp_info_str") empInfoStrDF.printSchema empInfoStrDF.show(false) val empInfoDF = empInfoStrDF.select(from_json('emp_info_str, empInfoSchema).as("emp_info")) empInfoDF.printSchema empInfoDF.select(struct("*")).show(false) empInfoDF.select("emp_info.name", "emp_info.address", "emp_info.docs").show(false) empInfoDF.select(explode('emp_info.getItem("name"))).show -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Writing custom Structured Streaming receiver
Cool. Thanks nezhazheng. I will give it a shot. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Writing custom Structured Streaming receiver
Hi TD, I looked at DataStreamReader class and looks like we can specify an FQCN as a source (provided that it implements trait Source). The DataSource.lookupDataSource function will try to load this FQCN during the creation of a DataSource object instance inside the DataStreamReader.load(). Will this work? I am curious if anyone has tried this yet? I am going to give a shot to see if this works. Thanks, Hien -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark build failure with com.oracle:ojdbc6:jar:11.2.0.1.0
Not I am not. I am considering downloading it manually and place it in my local repository. On Mon, May 2, 2016 at 5:54 PM, ☼ R Nair (रविशंकर नायर) < ravishankar.n...@gmail.com> wrote: > Oracle jdbc is not part of Maven repository, are you keeping a downloaded > file in your local repo? > > Best, RS > On May 2, 2016 8:51 PM, "Hien Luu" <hien...@gmail.com> wrote: > >> Hi all, >> >> I am running into a build problem with com.oracle:ojdbc6:jar:11.2.0.1.0. >> It kept getting "Operation timed out" while building Spark Project Docker >> Integration Tests module (see the error below). >> >> Has anyone run this problem before? If so, how did you resolve around >> this problem? >> >> [INFO] Reactor Summary: >> >> [INFO] >> >> [INFO] Spark Project Parent POM ... SUCCESS [ >> 2.423 s] >> >> [INFO] Spark Project Test Tags SUCCESS [ >> 0.712 s] >> >> [INFO] Spark Project Sketch ... SUCCESS [ >> 0.498 s] >> >> [INFO] Spark Project Networking ... SUCCESS [ >> 1.743 s] >> >> [INFO] Spark Project Shuffle Streaming Service SUCCESS [ >> 0.587 s] >> >> [INFO] Spark Project Unsafe ... SUCCESS [ >> 0.503 s] >> >> [INFO] Spark Project Launcher . SUCCESS [ >> 4.894 s] >> >> [INFO] Spark Project Core . SUCCESS [ >> 17.953 s] >> >> [INFO] Spark Project GraphX ... SUCCESS [ >> 3.480 s] >> >> [INFO] Spark Project Streaming SUCCESS [ >> 6.022 s] >> >> [INFO] Spark Project Catalyst . SUCCESS [ >> 8.664 s] >> >> [INFO] Spark Project SQL .. SUCCESS [ >> 12.440 s] >> >> [INFO] Spark Project ML Local Library . SUCCESS [ >> 0.498 s] >> >> [INFO] Spark Project ML Library ... SUCCESS [ >> 8.594 s] >> >> [INFO] Spark Project Tools SUCCESS [ >> 0.162 s] >> >> [INFO] Spark Project Hive . SUCCESS [ >> 9.834 s] >> >> [INFO] Spark Project HiveContext Compatibility SUCCESS [ >> 1.428 s] >> >> [INFO] Spark Project Docker Integration Tests . FAILURE >> [02:32 min] >> >> [INFO] Spark Project REPL . SKIPPED >> >> [INFO] Spark Project Assembly . SKIPPED >> >> [INFO] Spark Project External Flume Sink .. SKIPPED >> >> [INFO] Spark Project External Flume ... SKIPPED >> >> [INFO] Spark Project External Flume Assembly .. SKIPPED >> >> [INFO] Spark Project External Kafka ... SKIPPED >> >> [INFO] Spark Project Examples . SKIPPED >> >> [INFO] Spark Project External Kafka Assembly .. SKIPPED >> >> [INFO] Spark Project Java 8 Tests . SKIPPED >> >> [INFO] >> >> >> [INFO] BUILD FAILURE >> >> [INFO] >> >> >> [INFO] Total time: 03:53 min >> >> [INFO] Finished at: 2016-05-02T17:44:57-07:00 >> >> [INFO] Final Memory: 80M/1525M >> >> [INFO] >> >> >> [ERROR] Failed to execute goal on project >> spark-docker-integration-tests_2.11: Could not resolve dependencies for >> project >> org.apache.spark:spark-docker-integration-tests_2.11:jar:2.0.0-SNAPSHOT: >> Failed to collect dependencies at com.oracle:ojdbc6:jar:11.2.0.1.0: Failed >> to read artifact descriptor for com.oracle:ojdbc6:jar:11.2.0.1.0: Could not >> transfer artifact com.oracle:ojdbc6:pom:11.2.0.1.0 from/to uber-artifactory >> (http://artifactory.uber.internal:4587/artifactory/repo/): Connect to >> artifactory.uber.internal:4587 [artifactory.uber.internal/10.162.11.61] >> failed: Operation timed out -> [Help 1] >> >> -- Regards,
Spark build failure with com.oracle:ojdbc6:jar:11.2.0.1.0
Hi all, I am running into a build problem with com.oracle:ojdbc6:jar:11.2.0.1.0. It kept getting "Operation timed out" while building Spark Project Docker Integration Tests module (see the error below). Has anyone run this problem before? If so, how did you resolve around this problem? [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 2.423 s] [INFO] Spark Project Test Tags SUCCESS [ 0.712 s] [INFO] Spark Project Sketch ... SUCCESS [ 0.498 s] [INFO] Spark Project Networking ... SUCCESS [ 1.743 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 0.587 s] [INFO] Spark Project Unsafe ... SUCCESS [ 0.503 s] [INFO] Spark Project Launcher . SUCCESS [ 4.894 s] [INFO] Spark Project Core . SUCCESS [ 17.953 s] [INFO] Spark Project GraphX ... SUCCESS [ 3.480 s] [INFO] Spark Project Streaming SUCCESS [ 6.022 s] [INFO] Spark Project Catalyst . SUCCESS [ 8.664 s] [INFO] Spark Project SQL .. SUCCESS [ 12.440 s] [INFO] Spark Project ML Local Library . SUCCESS [ 0.498 s] [INFO] Spark Project ML Library ... SUCCESS [ 8.594 s] [INFO] Spark Project Tools SUCCESS [ 0.162 s] [INFO] Spark Project Hive . SUCCESS [ 9.834 s] [INFO] Spark Project HiveContext Compatibility SUCCESS [ 1.428 s] [INFO] Spark Project Docker Integration Tests . FAILURE [02:32 min] [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External Flume Assembly .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] Spark Project Java 8 Tests . SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 03:53 min [INFO] Finished at: 2016-05-02T17:44:57-07:00 [INFO] Final Memory: 80M/1525M [INFO] [ERROR] Failed to execute goal on project spark-docker-integration-tests_2.11: Could not resolve dependencies for project org.apache.spark:spark-docker-integration-tests_2.11:jar:2.0.0-SNAPSHOT: Failed to collect dependencies at com.oracle:ojdbc6:jar:11.2.0.1.0: Failed to read artifact descriptor for com.oracle:ojdbc6:jar:11.2.0.1.0: Could not transfer artifact com.oracle:ojdbc6:pom:11.2.0.1.0 from/to uber-artifactory (http://artifactory.uber.internal:4587/artifactory/repo/): Connect to artifactory.uber.internal:4587 [artifactory.uber.internal/10.162.11.61] failed: Operation timed out -> [Help 1]
Re: Spark Streaming updateStateByKey Implementation
Thanks Zoltan. I will take a look at StateDStream.scala On Sun, Nov 8, 2015 at 2:42 AM, Zoltán Zvara <zoltan.zv...@gmail.com> wrote: > It is implemented with cogroup. Basically it stores states in a separate > RDD and cogroups the target RDD with the state RDD, which is then hidden > from you. See StateDStream.scala, there is everything you need to know. > > On Fri, Nov 6, 2015 at 6:25 PM Hien Luu <h...@linkedin.com> wrote: > >> Hi, >> >> I am interested in learning about the implementation of >> updateStateByKey. Does anyone know of a jira or design doc I read? >> >> I did a quick search and couldn't find much info. on the implementation. >> >> Thanks in advance, >> >> Hien >> >
Spark Streaming updateStateByKey Implementation
Hi, I am interested in learning about the implementation of updateStateByKey. Does anyone know of a jira or design doc I read? I did a quick search and couldn't find much info. on the implementation. Thanks in advance, Hien
Re: Spark job workflow engine recommendations
The spark job type was added recently - see this pull request https://github.com/azkaban/azkaban-plugins/pull/195. You can leverage the SLA feature to kill a job if it ran longer than expected. BTW, we just solved the scalability issue by supporting multiple executors. Within a week or two, the code for that should be merged in the main trunk. Hien On Tue, Oct 6, 2015 at 9:40 PM, Vikram Kone <vikramk...@gmail.com> wrote: > Does Azkaban support scheduling long running jobs like spark steaming > jobs? Will Azkaban kill a job if it's running for a long time. > > > On Friday, August 7, 2015, Vikram Kone <vikramk...@gmail.com> wrote: > >> Hien, >> Is Azkaban being phased out at linkedin as rumored? If so, what's >> linkedin going to use for workflow scheduling? Is there something else >> that's going to replace Azkaban? >> >> On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> In my opinion, choosing some particular project among its peers should >>> leave enough room for future growth (which may come faster than you >>> initially think). >>> >>> Cheers >>> >>> On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu <h...@linkedin.com> wrote: >>> >>>> Scalability is a known issue due the the current architecture. However >>>> this will be applicable if you run more 20K jobs per day. >>>> >>>> On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> From what I heard (an ex-coworker who is Oozie committer), Azkaban is >>>>> being phased out at LinkedIn because of scalability issues (though >>>>> UI-wise, >>>>> Azkaban seems better). >>>>> >>>>> Vikram: >>>>> I suggest you do more research in related projects (maybe using their >>>>> mailing lists). >>>>> >>>>> Disclaimer: I don't work for LinkedIn. >>>>> >>>>> On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath < >>>>> nick.pentre...@gmail.com> wrote: >>>>> >>>>>> Hi Vikram, >>>>>> >>>>>> We use Azkaban (2.5.0) in our production workflow scheduling. We just >>>>>> use local mode deployment and it is fairly easy to set up. It is pretty >>>>>> easy to use and has a nice scheduling and logging interface, as well as >>>>>> SLAs (like kill job and notify if it doesn't complete in 3 hours or >>>>>> whatever). >>>>>> >>>>>> However Spark support is not present directly - we run everything >>>>>> with shell scripts and spark-submit. There is a plugin interface where >>>>>> one >>>>>> could create a Spark plugin, but I found it very cumbersome when I did >>>>>> investigate and didn't have the time to work through it to develop that. >>>>>> >>>>>> It has some quirks and while there is actually a REST API for adding >>>>>> jos and dynamically scheduling jobs, it is not documented anywhere so you >>>>>> kinda have to figure it out for yourself. But in terms of ease of use I >>>>>> found it way better than Oozie. I haven't tried Chronos, and it seemed >>>>>> quite involved to set up. Haven't tried Luigi either. >>>>>> >>>>>> Spark job server is good but as you say lacks some stuff like >>>>>> scheduling and DAG type workflows (independent of spark-defined job >>>>>> flows). >>>>>> >>>>>> >>>>>> On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke <jornfra...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Check also falcon in combination with oozie >>>>>>> >>>>>>> Le ven. 7 août 2015 à 17:51, Hien Luu <h...@linkedin.com.invalid> a >>>>>>> écrit : >>>>>>> >>>>>>>> Looks like Oozie can satisfy most of your requirements. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone <vikramk...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> I'm looking for open source workflow tools/engines that allow us >>>>>>>>> to schedule s
Re: Spark job workflow engine recommendations
We are in the middle of figuring that out. At the high level, we want to combine the best parts of existing workflow solutions. On Fri, Aug 7, 2015 at 3:55 PM, Vikram Kone vikramk...@gmail.com wrote: Hien, Is Azkaban being phased out at linkedin as rumored? If so, what's linkedin going to use for workflow scheduling? Is there something else that's going to replace Azkaban? On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu yuzhih...@gmail.com wrote: In my opinion, choosing some particular project among its peers should leave enough room for future growth (which may come faster than you initially think). Cheers On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu h...@linkedin.com wrote: Scalability is a known issue due the the current architecture. However this will be applicable if you run more 20K jobs per day. On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu yuzhih...@gmail.com wrote: From what I heard (an ex-coworker who is Oozie committer), Azkaban is being phased out at LinkedIn because of scalability issues (though UI-wise, Azkaban seems better). Vikram: I suggest you do more research in related projects (maybe using their mailing lists). Disclaimer: I don't work for LinkedIn. On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Hi Vikram, We use Azkaban (2.5.0) in our production workflow scheduling. We just use local mode deployment and it is fairly easy to set up. It is pretty easy to use and has a nice scheduling and logging interface, as well as SLAs (like kill job and notify if it doesn't complete in 3 hours or whatever). However Spark support is not present directly - we run everything with shell scripts and spark-submit. There is a plugin interface where one could create a Spark plugin, but I found it very cumbersome when I did investigate and didn't have the time to work through it to develop that. It has some quirks and while there is actually a REST API for adding jos and dynamically scheduling jobs, it is not documented anywhere so you kinda have to figure it out for yourself. But in terms of ease of use I found it way better than Oozie. I haven't tried Chronos, and it seemed quite involved to set up. Haven't tried Luigi either. Spark job server is good but as you say lacks some stuff like scheduling and DAG type workflows (independent of spark-defined job flows). On Fri, Aug 7, 2015 at 7:00 PM, Jörn Franke jornfra...@gmail.com wrote: Check also falcon in combination with oozie Le ven. 7 août 2015 à 17:51, Hien Luu h...@linkedin.com.invalid a écrit : Looks like Oozie can satisfy most of your requirements. On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone vikramk...@gmail.com wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs
Re: Newbie question: what makes Spark run faster than MapReduce
This blog outlines a few things that make Spark faster than MapReduce - https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html On Fri, Aug 7, 2015 at 9:13 AM, Muler mulugeta.abe...@gmail.com wrote: Consider the classic word count application over a 4 node cluster with a sizable working data. What makes Spark ran faster than MapReduce considering that Spark also has to write to disk during shuffle?
Re: Spark job workflow engine recommendations
Looks like Oozie can satisfy most of your requirements. On Fri, Aug 7, 2015 at 8:43 AM, Vikram Kone vikramk...@gmail.com wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production scale. 3. Should be dead easy to write job dependencices using XML or web interface . Ex; job A depends on Job B and Job C, so run Job A after B and C are finished. Don't need to write full blown java applications to specify job parameters and dependencies. Should be very simple to use. 4. Time based recurrent scheduling. Run the spark jobs at a given time every hour or day or week or month. 5. Job monitoring, alerting on failures and email notifications on daily basis. I have looked at Ooyala's spark job server which seems to be hated towards making spark jobs run faster by sharing contexts between the jobs but isn't a full blown workflow engine per se. A combination of spark job server and workflow engine would be ideal Thanks for the inputs