Re: What could be the cause of an execution freeze on Hadoop for small datasets?
" In this case your program may work because effectively you are not using the spark in yarn on the hadoop cluster " I am actually using Yarn as mentioned (client mode) I already know that, but it is not just about collectAsList, the execution freezes also for example when using save() on the dataset (after the transformations, before them it is ok to perform save() on the dataset). I hope the question is clearer (for anybody who's reading) now. Le sam. 11 mars 2023 à 20:15, Mich Talebzadeh a écrit : > collectAsList brings all the data into the driver which is a single JVM > on a single node. In this case your program may work because effectively > you are not using the spark in yarn on the hadoop cluster. The benefit of > Spark is that you can process a large amount of data using the memory and > processors across multiple executors on multiple nodes. > > > HTH > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 11 Mar 2023 at 19:01, sam smith > wrote: > >> not sure what you mean by your question, but it is not helping in any case >> >> >> Le sam. 11 mars 2023 à 19:54, Mich Talebzadeh >> a écrit : >> >>> >>> >>> ... To note that if I execute collectAsList on the dataset at the >>> beginning of the program >>> >>> What do you think collectAsList does? >>> >>> >>> >>>view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Sat, 11 Mar 2023 at 18:29, sam smith >>> wrote: >>> >>>> Hello guys, >>>> >>>> I am launching through code (client mode) a Spark program to run in >>>> Hadoop. If I execute on the dataset methods of the likes of show() and >>>> count() or collectAsList() (that are displayed in the Spark UI) after >>>> performing heavy transformations on the columns then the mentioned methods >>>> will cause the execution to freeze on Hadoop and that independently of the >>>> dataset size (intriguing issue for small size datasets!). >>>> Any idea what could be causing this type of issue? >>>> To note that if I execute collectAsList on the dataset at the beginning >>>> of the program (before performing the transformations on the columns) then >>>> the method yields results correctly. >>>> >>>> Thanks. >>>> Regards >>>> >>>>
Re: What could be the cause of an execution freeze on Hadoop for small datasets?
not sure what you mean by your question, but it is not helping in any case Le sam. 11 mars 2023 à 19:54, Mich Talebzadeh a écrit : > > > ... To note that if I execute collectAsList on the dataset at the > beginning of the program > > What do you think collectAsList does? > > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 11 Mar 2023 at 18:29, sam smith > wrote: > >> Hello guys, >> >> I am launching through code (client mode) a Spark program to run in >> Hadoop. If I execute on the dataset methods of the likes of show() and >> count() or collectAsList() (that are displayed in the Spark UI) after >> performing heavy transformations on the columns then the mentioned methods >> will cause the execution to freeze on Hadoop and that independently of the >> dataset size (intriguing issue for small size datasets!). >> Any idea what could be causing this type of issue? >> To note that if I execute collectAsList on the dataset at the beginning >> of the program (before performing the transformations on the columns) then >> the method yields results correctly. >> >> Thanks. >> Regards >> >>
What could be the cause of an execution freeze on Hadoop for small datasets?
Hello guys, I am launching through code (client mode) a Spark program to run in Hadoop. If I execute on the dataset methods of the likes of show() and count() or collectAsList() (that are displayed in the Spark UI) after performing heavy transformations on the columns then the mentioned methods will cause the execution to freeze on Hadoop and that independently of the dataset size (intriguing issue for small size datasets!). Any idea what could be causing this type of issue? To note that if I execute collectAsList on the dataset at the beginning of the program (before performing the transformations on the columns) then the method yields results correctly. Thanks. Regards
How to allocate vcores to driver (client mode)
Hi, I am launching through code (client mode) a Spark program to run in Hadoop. Whenever I check the executors tab of Spark UI I always get 0 as the number of vcores for the driver. I tried to change that using *spark.driver.cores*, or also *spark.yarn.am.cores* in the SparkSession configuration but in vain. I also tried to set those parameters in spark-defaults but, again, with no success. To note that in the environment tab, the right config is displayed. Could this be the reason for a *CollectAsList *to freeze the execution (not having enough CPU)?
How to share a dataset file across nodes
Hello, I use Yarn client mode to submit my driver program to Hadoop, the dataset I load is from the local file system, when i invoke load("file://path") Spark complains about the csv file being not found, which i totally understand, since the dataset is not in any of the workers or the applicationMaster but only where the driver program resides. I tried to share the file using the configurations: > *spark.yarn.dist.files* OR *spark.files * but both ain't working. My question is how to share the csv dataset across the nodes at the specified path? Thanks.
Re: How to explode array columns of a dataframe having the same length
@Enrico Minack I used arrays_zip to merge values into one row, and then used toJSON() to export the data. @Bjørn explode_outer didn't yield the expected results. Thanks anyway. Le jeu. 16 févr. 2023 à 09:06, Enrico Minack a écrit : > You have to take each row and zip the lists, each element of the result > becomes one new row. > > So turn write a method that turns > Row(List("A","B","null"), List("C","D","null"), List("E","null","null")) > into > List(List("A","C","E"), List("B","D","null"), List("null","null","null")) > and use flatmap with that method. > > In Scala, this would read: > > df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1), > row.getSeq[String](2)).zipped.toIterable }.show() > > Enrico > > > Am 14.02.23 um 22:54 schrieb sam smith: > > Hello guys, > > I have the following dataframe: > > *col1* > > *col2* > > *col3* > > ["A","B","null"] > > ["C","D","null"] > > ["E","null","null"] > > > I want to explode it to the following dataframe: > > *col1* > > *col2* > > *col3* > > "A" > > "C" > > "E" > > "B" > > "D" > > "null" > > "null" > > "null" > > "null" > > How to do that (preferably in Java) using the explode() method ? knowing > that something like the following won't yield correct output: > > for (String colName: dataset.columns()) > dataset=dataset.withColumn(colName,explode(dataset.col(colName))); > > > >
How to explode array columns of a dataframe having the same length
Hello guys, I have the following dataframe: *col1* *col2* *col3* ["A","B","null"] ["C","D","null"] ["E","null","null"] I want to explode it to the following dataframe: *col1* *col2* *col3* "A" "C" "E" "B" "D" "null" "null" "null" "null" How to do that (preferably in Java) using the explode() method ? knowing that something like the following won't yield correct output: for (String colName: dataset.columns()) dataset=dataset.withColumn(colName,explode(dataset.col(colName)));
Re: How to improve efficiency of this piece of code (returning distinct column values)
Alright, this is the working Java version of it: List listCols = new ArrayList(); > Arrays.asList(dataset.columns()).forEach(column -> { > listCols.add(org.apache.spark.sql.functions.collect_set(column)); }); > Column[] arrCols = listCols.toArray(new Column[listCols.size()]); > dataset = dataset.select(arrCols); But then, I tried to explode the set of values into rows, through the explode() but the column values repeat to fill the size of the largest column. How to set the repeated values to null instead? (thus keeping only one exploded set of column values in each column). Thanks. Le dim. 12 févr. 2023 à 22:43, Enrico Minack a écrit : > @Sean: This aggregate function does work without an explicit groupBy(): > > ./spark-3.3.1-bin-hadoop2/bin/spark-shell > Spark context Web UI available at http://*:4040 > Spark context available as 'sc' (master = local[*], app id = > local-1676237726079). > Spark session available as 'spark'. > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 3.3.1 > /_/ > > Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17) > Type in expressions to have them evaluated. > Type :help for more information. > > scala> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, > 10, "one")).toDF("a", "b", "c") > scala> df.select(df.columns.map(column => > collect_set(col(column)).as(column)): _*).show() > +++------+ > > | a| b| c| > +++--+ > |[1, 2, 3, 4]|[20, 10]|[one, two]| > +++--+ > > @Sam: I haven't tested the Java code, sorry. I presume you can work it out > from the working Scala code. > > Enrico > > > Am 12.02.23 um 21:32 schrieb Sean Owen: > > It doesn't work because it's an aggregate function. You have to groupBy() > (group by nothing) to make that work, but, you can't assign that as a > column. Folks those approaches don't make sense semantically in SQL or > Spark or anything. > They just mean use threads to collect() distinct values for each col in > parallel using threads in your program. You don't have to but you could. > What else are we looking for here, the answer has been given a number of > times I think. > > > On Sun, Feb 12, 2023 at 2:28 PM sam smith > wrote: > >> OK, what do you mean by " do your outer for loop in parallel "? >> btw this didn't work: >> for (String columnName : df.columns()) { >> df= df.withColumn(columnName, >> collect_set(col(columnName)).as(columnName)); >> } >> >> >> Le dim. 12 févr. 2023 à 20:36, Enrico Minack a >> écrit : >> >>> That is unfortunate, but 3.4.0 is around the corner, really! >>> >>> Well, then based on your code, I'd suggest two improvements: >>> - cache your dataframe after reading, this way, you don't read the >>> entire file for each column >>> - do your outer for loop in parallel, then you have N parallel Spark >>> jobs (only helps if your Spark cluster is not fully occupied by a single >>> column) >>> >>> Your withColumn-approach does not work because withColumn expects a >>> column as the second argument, but df.select(columnName).distinct() is a >>> DataFrame and .col is a column in *that* DataFrame, it is not a column >>> of the dataframe that you call withColumn on. >>> >>> It should read: >>> >>> Scala: >>> df.select(df.columns.map(column => collect_set(col(column)).as(column)): >>> _*).show() >>> >>> Java: >>> for (String columnName : df.columns()) { >>> df= df.withColumn(columnName, >>> collect_set(col(columnName)).as(columnName)); >>> } >>> >>> Then you have a single DataFrame that computes all columns in a single >>> Spark job. >>> >>> But this reads all distinct values into a single partition, which has >>> the same downside as collect, so this is as bad as using collect. >>> >>> Cheers, >>> Enrico >>> >>> >>> Am 12.02.23 um 18:05 schrieb sam smith: >>> >>> @Enrico Minack Thanks for "unpivot" but I am >>> using version 3.3.0 (you are taking it way too far as usual :) ) >>> @Sean Owen Pls then show me how it can be improved >>> by code. >>> >>> Also, why such an approach (using withColumn() ) doesn't work: >>> >>> for (String columnNa
Re: How to improve efficiency of this piece of code (returning distinct column values)
OK, what do you mean by " do your outer for loop in parallel "? btw this didn't work: for (String columnName : df.columns()) { df= df.withColumn(columnName, collect_set(col(columnName)).as(columnName)); } Le dim. 12 févr. 2023 à 20:36, Enrico Minack a écrit : > That is unfortunate, but 3.4.0 is around the corner, really! > > Well, then based on your code, I'd suggest two improvements: > - cache your dataframe after reading, this way, you don't read the entire > file for each column > - do your outer for loop in parallel, then you have N parallel Spark jobs > (only helps if your Spark cluster is not fully occupied by a single column) > > Your withColumn-approach does not work because withColumn expects a column > as the second argument, but df.select(columnName).distinct() is a DataFrame > and .col is a column in *that* DataFrame, it is not a column of the > dataframe that you call withColumn on. > > It should read: > > Scala: > df.select(df.columns.map(column => collect_set(col(column)).as(column)): > _*).show() > > Java: > for (String columnName : df.columns()) { > df= df.withColumn(columnName, > collect_set(col(columnName)).as(columnName)); > } > > Then you have a single DataFrame that computes all columns in a single > Spark job. > > But this reads all distinct values into a single partition, which has the > same downside as collect, so this is as bad as using collect. > > Cheers, > Enrico > > > Am 12.02.23 um 18:05 schrieb sam smith: > > @Enrico Minack Thanks for "unpivot" but I am using > version 3.3.0 (you are taking it way too far as usual :) ) > @Sean Owen Pls then show me how it can be improved by > code. > > Also, why such an approach (using withColumn() ) doesn't work: > > for (String columnName : df.columns()) { > df= df.withColumn(columnName, > df.select(columnName).distinct().col(columnName)); > } > > Le sam. 11 févr. 2023 à 13:11, Enrico Minack a > écrit : > >> You could do the entire thing in DataFrame world and write the result to >> disk. All you need is unpivot (to be released in Spark 3.4.0, soon). >> >> Note this is Scala but should be straightforward to translate into Java: >> >> import org.apache.spark.sql.functions.collect_set >> >> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10, >> 123)).toDF("a", "b", "c") >> >> df.unpivot(Array.empty, "column", "value") >> .groupBy("column") >> .agg(collect_set("value").as("distinct_values")) >> >> The unpivot operation turns >> +---+---+---+ >> | a| b| c| >> +---+---+---+ >> | 1| 10|123| >> | 2| 20|124| >> | 3| 20|123| >> | 4| 10|123| >> +---+---+---+ >> >> into >> >> +--+-+ >> |column|value| >> +--+-+ >> | a|1| >> | b| 10| >> | c| 123| >> | a|2| >> | b| 20| >> | c| 124| >> | a|3| >> | b| 20| >> | c| 123| >> | a|4| >> | b| 10| >> | c| 123| >> +--+-+ >> >> The groupBy("column").agg(collect_set("value").as("distinct_values")) >> collects distinct values per column: >> +--+---+ >> >> |column|distinct_values| >> +--+---+ >> | c| [123, 124]| >> | b| [20, 10]| >> | a| [1, 2, 3, 4]| >> +--+---+ >> >> Note that unpivot only works if all columns have a "common" type. Then >> all columns are cast to that common type. If you have incompatible types >> like Integer and String, you would have to cast them all to String first: >> >> import org.apache.spark.sql.types.StringType >> >> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...) >> >> If you want to preserve the type of the values and have multiple value >> types, you cannot put everything into a DataFrame with one >> distinct_values column. You could still have multiple DataFrames, one >> per data type, and write those, or collect the DataFrame's values into Maps: >> >> import scala.collection.immutable >> >> import org.apache.spark.sql.DataFrame >> import org.apache.spark.sql.functions.collect_set >> >> // if all you columns have the same type >> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String, >> immutable.Seq[Any]] = { >> df.unpivot(Array.empty, "column", "value") >> .groupBy
Re: How to improve efficiency of this piece of code (returning distinct column values)
@Sean Correct. But I was hoping to improve my solution even more. Le dim. 12 févr. 2023 à 18:03, Sean Owen a écrit : > That's the answer, except, you can never select a result set into a column > right? you just collect() each of those results. Or, what do you want? I'm > not clear. > > On Sun, Feb 12, 2023 at 10:59 AM sam smith > wrote: > >> @Enrico Minack Thanks for "unpivot" but I am >> using version 3.3.0 (you are taking it way too far as usual :) ) >> @Sean Owen Pls then show me how it can be improved by >> code. >> >> Also, why such an approach (using withColumn() ) doesn't work: >> >> for (String columnName : df.columns()) { >> df= df.withColumn(columnName, >> df.select(columnName).distinct().col(columnName)); >> } >> >> Le sam. 11 févr. 2023 à 13:11, Enrico Minack a >> écrit : >> >>> You could do the entire thing in DataFrame world and write the result to >>> disk. All you need is unpivot (to be released in Spark 3.4.0, soon). >>> >>> Note this is Scala but should be straightforward to translate into Java: >>> >>> import org.apache.spark.sql.functions.collect_set >>> >>> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10, >>> 123)).toDF("a", "b", "c") >>> >>> df.unpivot(Array.empty, "column", "value") >>> .groupBy("column") >>> .agg(collect_set("value").as("distinct_values")) >>> >>> The unpivot operation turns >>> +---+---+---+ >>> | a| b| c| >>> +---+---+---+ >>> | 1| 10|123| >>> | 2| 20|124| >>> | 3| 20|123| >>> | 4| 10|123| >>> +---+---+---+ >>> >>> into >>> >>> +--+-+ >>> |column|value| >>> +--+-+ >>> | a|1| >>> | b| 10| >>> | c| 123| >>> | a|2| >>> | b| 20| >>> | c| 124| >>> | a|3| >>> | b| 20| >>> | c| 123| >>> | a|4| >>> | b| 10| >>> | c| 123| >>> +--+-+ >>> >>> The groupBy("column").agg(collect_set("value").as("distinct_values")) >>> collects distinct values per column: >>> +--+---+ >>> >>> |column|distinct_values| >>> +--+---+ >>> | c| [123, 124]| >>> | b| [20, 10]| >>> | a| [1, 2, 3, 4]| >>> +--+---+ >>> >>> Note that unpivot only works if all columns have a "common" type. Then >>> all columns are cast to that common type. If you have incompatible types >>> like Integer and String, you would have to cast them all to String first: >>> >>> import org.apache.spark.sql.types.StringType >>> >>> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...) >>> >>> If you want to preserve the type of the values and have multiple value >>> types, you cannot put everything into a DataFrame with one >>> distinct_values column. You could still have multiple DataFrames, one >>> per data type, and write those, or collect the DataFrame's values into Maps: >>> >>> import scala.collection.immutable >>> >>> import org.apache.spark.sql.DataFrame >>> import org.apache.spark.sql.functions.collect_set >>> >>> // if all you columns have the same type >>> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String, >>> immutable.Seq[Any]] = { >>> df.unpivot(Array.empty, "column", "value") >>> .groupBy("column") >>> .agg(collect_set("value").as("distinct_values")) >>> .collect() >>> .map(row => row.getString(0) -> row.getSeq[Any](1).toList) >>> .toMap >>> } >>> >>> >>> // if your columns have different types >>> def distinctValuesPerColumn(df: DataFrame): immutable.Map[String, >>> immutable.Seq[Any]] = { >>> df.schema.fields >>> .groupBy(_.dataType) >>> .mapValues(_.map(_.name)) >>> .par >>> .map { case (dataType, columns) => df.select(columns.map(col): _*) } >>> .map(distinctValuesPerColumnOneType) >>> .flatten >>> .toList >>> .toMap >>> } >>> >>> val df = Seq(
Re: How to improve efficiency of this piece of code (returning distinct column values)
@Enrico Minack Thanks for "unpivot" but I am using version 3.3.0 (you are taking it way too far as usual :) ) @Sean Owen Pls then show me how it can be improved by code. Also, why such an approach (using withColumn() ) doesn't work: for (String columnName : df.columns()) { df= df.withColumn(columnName, df.select(columnName).distinct().col(columnName)); } Le sam. 11 févr. 2023 à 13:11, Enrico Minack a écrit : > You could do the entire thing in DataFrame world and write the result to > disk. All you need is unpivot (to be released in Spark 3.4.0, soon). > > Note this is Scala but should be straightforward to translate into Java: > > import org.apache.spark.sql.functions.collect_set > > val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10, > 123)).toDF("a", "b", "c") > > df.unpivot(Array.empty, "column", "value") > .groupBy("column") > .agg(collect_set("value").as("distinct_values")) > > The unpivot operation turns > +---+---+---+ > | a| b| c| > +---+---+---+ > | 1| 10|123| > | 2| 20|124| > | 3| 20|123| > | 4| 10|123| > +---+---+---+ > > into > > +--+-+ > |column|value| > +--+-+ > | a|1| > | b| 10| > | c| 123| > | a|2| > | b| 20| > | c| 124| > | a|3| > | b| 20| > | c| 123| > | a|4| > | b| 10| > | c| 123| > +--+-+ > > The groupBy("column").agg(collect_set("value").as("distinct_values")) > collects distinct values per column: > +--+---+ > > |column|distinct_values| > +--+---+ > | c| [123, 124]| > | b| [20, 10]| > | a| [1, 2, 3, 4]| > +--+---+ > > Note that unpivot only works if all columns have a "common" type. Then all > columns are cast to that common type. If you have incompatible types like > Integer and String, you would have to cast them all to String first: > > import org.apache.spark.sql.types.StringType > > df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...) > > If you want to preserve the type of the values and have multiple value > types, you cannot put everything into a DataFrame with one distinct_values > column. You could still have multiple DataFrames, one per data type, and > write those, or collect the DataFrame's values into Maps: > > import scala.collection.immutable > > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.functions.collect_set > > // if all you columns have the same type > def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String, > immutable.Seq[Any]] = { > df.unpivot(Array.empty, "column", "value") > .groupBy("column") > .agg(collect_set("value").as("distinct_values")) > .collect() > .map(row => row.getString(0) -> row.getSeq[Any](1).toList) > .toMap > } > > > // if your columns have different types > def distinctValuesPerColumn(df: DataFrame): immutable.Map[String, > immutable.Seq[Any]] = { > df.schema.fields > .groupBy(_.dataType) > .mapValues(_.map(_.name)) > .par > .map { case (dataType, columns) => df.select(columns.map(col): _*) } > .map(distinctValuesPerColumnOneType) > .flatten > .toList > .toMap > } > > val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10, > "one")).toDF("a", "b", "c") > distinctValuesPerColumn(df) > > The result is: (list values are of original type) > Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two)) > > Hope this helps, > Enrico > > > Am 10.02.23 um 22:56 schrieb sam smith: > > Hi Apotolos, > Can you suggest a better approach while keeping values within a dataframe? > > Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos < > papad...@csd.auth.gr> a écrit : > >> Dear Sam, >> >> you are assuming that the data fits in the memory of your local machine. >> You are using as a basis a dataframe, which potentially can be very large, >> and then you are storing the data in local lists. Keep in mind that that >> the number of distinct elements in a column may be very large (depending on >> the app). I suggest to work on a solution that assumes that the number of >> distinct values is also large. Thus, you should keep your data in >> dataframes or RDDs, and store them as csv files, parquet, etc. >> >> a.p. >> >> >>
Re: How to improve efficiency of this piece of code (returning distinct column values)
I am not sure i understand well " Just need to do the cols one at a time". Plus I think Apostolos is right, this needs a dataframe approach not a list approach. Le ven. 10 févr. 2023 à 22:47, Sean Owen a écrit : > For each column, select only that call and get distinct values. Similar to > what you do here. Just need to do the cols one at a time. Your current code > doesn't do what you want. > > On Fri, Feb 10, 2023, 3:46 PM sam smith > wrote: > >> Hi Sean, >> >> "You need to select the distinct values of each col one at a time", how ? >> >> Le ven. 10 févr. 2023 à 22:40, Sean Owen a écrit : >> >>> That gives you all distinct tuples of those col values. You need to >>> select the distinct values of each col one at a time. Sure just collect() >>> the result as you do here. >>> >>> On Fri, Feb 10, 2023, 3:34 PM sam smith >>> wrote: >>> >>>> I want to get the distinct values of each column in a List (is it good >>>> practice to use List here?), that contains as first element the column >>>> name, and the other element its distinct values so that for a dataset we >>>> get a list of lists, i do it this way (in my opinion no so fast): >>>> >>>> List> finalList = new ArrayList>(); >>>> Dataset df = spark.read().format("csv").option("header", >>>> "true").load("/pathToCSV"); >>>> String[] columnNames = df.columns(); >>>> for (int i=0;i>>> List columnList = new ArrayList(); >>>> >>>> columnList.add(columnNames[i]); >>>> >>>> >>>> List columnValues = >>>> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList(); >>>> for (int j=0;j>>> columnList.add(columnValues.get(j).apply(0).toString()); >>>> >>>> finalList.add(columnList); >>>> >>>> >>>> How to improve this? >>>> >>>> Also, can I get the results in JSON format? >>>> >>>
Re: How to improve efficiency of this piece of code (returning distinct column values)
Hi Apotolos, Can you suggest a better approach while keeping values within a dataframe? Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos < papad...@csd.auth.gr> a écrit : > Dear Sam, > > you are assuming that the data fits in the memory of your local machine. > You are using as a basis a dataframe, which potentially can be very large, > and then you are storing the data in local lists. Keep in mind that that > the number of distinct elements in a column may be very large (depending on > the app). I suggest to work on a solution that assumes that the number of > distinct values is also large. Thus, you should keep your data in > dataframes or RDDs, and store them as csv files, parquet, etc. > > a.p. > > > On 10/2/23 23:40, sam smith wrote: > > I want to get the distinct values of each column in a List (is it good > practice to use List here?), that contains as first element the column > name, and the other element its distinct values so that for a dataset we > get a list of lists, i do it this way (in my opinion no so fast): > > List> finalList = new ArrayList>(); > Dataset df = spark.read().format("csv").option("header", > "true").load("/pathToCSV"); > String[] columnNames = df.columns(); > for (int i=0;i List columnList = new ArrayList(); > > columnList.add(columnNames[i]); > > > List columnValues = > df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList(); > for (int j=0;j columnList.add(columnValues.get(j).apply(0).toString()); > > finalList.add(columnList); > > > How to improve this? > > Also, can I get the results in JSON format? > > -- > Apostolos N. Papadopoulos, Associate Professor > Department of Informatics > Aristotle University of Thessaloniki > Thessaloniki, GREECE > tel: ++0030312310991918 > email: papad...@csd.auth.gr > twitter: @papadopoulos_ap > web: http://datalab.csd.auth.gr/~apostol > >
Re: How to improve efficiency of this piece of code (returning distinct column values)
Hi Sean, "You need to select the distinct values of each col one at a time", how ? Le ven. 10 févr. 2023 à 22:40, Sean Owen a écrit : > That gives you all distinct tuples of those col values. You need to select > the distinct values of each col one at a time. Sure just collect() the > result as you do here. > > On Fri, Feb 10, 2023, 3:34 PM sam smith > wrote: > >> I want to get the distinct values of each column in a List (is it good >> practice to use List here?), that contains as first element the column >> name, and the other element its distinct values so that for a dataset we >> get a list of lists, i do it this way (in my opinion no so fast): >> >> List> finalList = new ArrayList>(); >> Dataset df = spark.read().format("csv").option("header", >> "true").load("/pathToCSV"); >> String[] columnNames = df.columns(); >> for (int i=0;i> List columnList = new ArrayList(); >> >> columnList.add(columnNames[i]); >> >> >> List columnValues = >> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList(); >> for (int j=0;j> columnList.add(columnValues.get(j).apply(0).toString()); >> >> finalList.add(columnList); >> >> >> How to improve this? >> >> Also, can I get the results in JSON format? >> >
How to improve efficiency of this piece of code (returning distinct column values)
I want to get the distinct values of each column in a List (is it good practice to use List here?), that contains as first element the column name, and the other element its distinct values so that for a dataset we get a list of lists, i do it this way (in my opinion no so fast): List> finalList = new ArrayList>(); Dataset df = spark.read().format("csv").option("header", "true").load("/pathToCSV"); String[] columnNames = df.columns(); for (int i=0;i columnList = new ArrayList(); columnList.add(columnNames[i]); List columnValues = df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList(); for (int j=0;j
Can we upload a csv dataset into Hive using SparkSQL?
Hello, I want to create a table in Hive and then load a CSV file content into it all by means of Spark SQL. I saw in the docs the example with the .txt file BUT can we do instead something like the following to accomplish what i want? : String warehouseLocation = new File("spark-warehouse").getAbsolutePath();SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate(); spark.sql("CREATE TABLE IF NOT EXISTS csvFile USING hive");spark.sql("LOAD DATA LOCAL INPATH 'C:/Users/Me/Documents/examples/src/main/resources/data.csv' INTO TABLE csvFile");
Re: Aggregate over a column: the proper way to do
Exact, one row, and two columns Le sam. 9 avr. 2022 à 17:44, Sean Owen a écrit : > But it only has one row, right? > > On Sat, Apr 9, 2022, 10:06 AM sam smith > wrote: > >> Yes. Returns the number of rows in the Dataset as *long*. but in my case >> the aggregation returns a table of two columns. >> >> Le ven. 8 avr. 2022 à 14:12, Sean Owen a écrit : >> >>> Dataset.count() returns one value directly? >>> >>> On Thu, Apr 7, 2022 at 11:25 PM sam smith >>> wrote: >>> >>>> My bad, yes of course that! still i don't like the .. >>>> select("count(myCol)") .. part in my line is there any replacement to that >>>> ? >>>> >>>> Le ven. 8 avr. 2022 à 06:13, Sean Owen a écrit : >>>> >>>>> Just do an average then? Most of my point is that filtering to one >>>>> group and then grouping is pointless. >>>>> >>>>> On Thu, Apr 7, 2022, 11:10 PM sam smith >>>>> wrote: >>>>> >>>>>> What if i do avg instead of count? >>>>>> >>>>>> Le ven. 8 avr. 2022 à 05:32, Sean Owen a écrit : >>>>>> >>>>>>> Wait, why groupBy at all? After the filter only rows with myCol >>>>>>> equal to your target are left. There is only one group. Don't group just >>>>>>> count after the filter? >>>>>>> >>>>>>> On Thu, Apr 7, 2022, 10:27 PM sam smith >>>>>>> wrote: >>>>>>> >>>>>>>> I want to aggregate a column by counting the number of rows having >>>>>>>> the value "myTargetValue" and return the result >>>>>>>> I am doing it like the following:in JAVA >>>>>>>> >>>>>>>>> long result = >>>>>>>>> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0); >>>>>>>> >>>>>>>> >>>>>>>> Is that the right way? if no, what if a more optimized way to do >>>>>>>> that (always in JAVA)? >>>>>>>> Thanks for the help. >>>>>>>> >>>>>>>
Re: Aggregate over a column: the proper way to do
Yes. Returns the number of rows in the Dataset as *long*. but in my case the aggregation returns a table of two columns. Le ven. 8 avr. 2022 à 14:12, Sean Owen a écrit : > Dataset.count() returns one value directly? > > On Thu, Apr 7, 2022 at 11:25 PM sam smith > wrote: > >> My bad, yes of course that! still i don't like the .. >> select("count(myCol)") .. part in my line is there any replacement to that ? >> >> Le ven. 8 avr. 2022 à 06:13, Sean Owen a écrit : >> >>> Just do an average then? Most of my point is that filtering to one group >>> and then grouping is pointless. >>> >>> On Thu, Apr 7, 2022, 11:10 PM sam smith >>> wrote: >>> >>>> What if i do avg instead of count? >>>> >>>> Le ven. 8 avr. 2022 à 05:32, Sean Owen a écrit : >>>> >>>>> Wait, why groupBy at all? After the filter only rows with myCol equal >>>>> to your target are left. There is only one group. Don't group just count >>>>> after the filter? >>>>> >>>>> On Thu, Apr 7, 2022, 10:27 PM sam smith >>>>> wrote: >>>>> >>>>>> I want to aggregate a column by counting the number of rows having >>>>>> the value "myTargetValue" and return the result >>>>>> I am doing it like the following:in JAVA >>>>>> >>>>>>> long result = >>>>>>> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0); >>>>>> >>>>>> >>>>>> Is that the right way? if no, what if a more optimized way to do that >>>>>> (always in JAVA)? >>>>>> Thanks for the help. >>>>>> >>>>>
Re: Aggregate over a column: the proper way to do
My bad, yes of course that! still i don't like the .. select("count(myCol)") .. part in my line is there any replacement to that ? Le ven. 8 avr. 2022 à 06:13, Sean Owen a écrit : > Just do an average then? Most of my point is that filtering to one group > and then grouping is pointless. > > On Thu, Apr 7, 2022, 11:10 PM sam smith > wrote: > >> What if i do avg instead of count? >> >> Le ven. 8 avr. 2022 à 05:32, Sean Owen a écrit : >> >>> Wait, why groupBy at all? After the filter only rows with myCol equal to >>> your target are left. There is only one group. Don't group just count after >>> the filter? >>> >>> On Thu, Apr 7, 2022, 10:27 PM sam smith >>> wrote: >>> >>>> I want to aggregate a column by counting the number of rows having the >>>> value "myTargetValue" and return the result >>>> I am doing it like the following:in JAVA >>>> >>>>> long result = >>>>> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0); >>>> >>>> >>>> Is that the right way? if no, what if a more optimized way to do that >>>> (always in JAVA)? >>>> Thanks for the help. >>>> >>>
Re: Aggregate over a column: the proper way to do
What if i do avg instead of count? Le ven. 8 avr. 2022 à 05:32, Sean Owen a écrit : > Wait, why groupBy at all? After the filter only rows with myCol equal to > your target are left. There is only one group. Don't group just count after > the filter? > > On Thu, Apr 7, 2022, 10:27 PM sam smith > wrote: > >> I want to aggregate a column by counting the number of rows having the >> value "myTargetValue" and return the result >> I am doing it like the following:in JAVA >> >>> long result = >>> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0); >> >> >> Is that the right way? if no, what if a more optimized way to do that >> (always in JAVA)? >> Thanks for the help. >> >
Aggregate over a column: the proper way to do
I want to aggregate a column by counting the number of rows having the value "myTargetValue" and return the result I am doing it like the following:in JAVA > long result = > dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0); Is that the right way? if no, what if a more optimized way to do that (always in JAVA)? Thanks for the help.
Re: Spark execution on Hadoop cluster (many nodes)
spark-submit a spark application on Hadoop (cluster mode) that's what i mean by executing on Hadoop Le lun. 24 janv. 2022 à 18:00, Sean Owen a écrit : > I am still not understanding what you mean by "executing on Hadoop". Spark > does not use Hadoop for execution. Probably can't answer until this is > cleared up. > > On Mon, Jan 24, 2022 at 10:57 AM sam smith > wrote: > >> I mean the DAG order is somehow altered when executing on Hadoop >> >> Le lun. 24 janv. 2022 à 17:17, Sean Owen a écrit : >> >>> Code is not executed by Hadoop, nor passed through Hadoop somehow. Do >>> you mean data? data is read as-is. There is typically no guarantee about >>> ordering of data in files but you can order data. Still not sure what >>> specifically you are worried about here, but I don't think the kind of >>> thing you're contemplating can happen, no >>> >>> On Mon, Jan 24, 2022 at 9:28 AM sam smith >>> wrote: >>> >>>> I am aware of that, but whenever the chunks of code are returned to >>>> Spark from Hadoop (after processing) could they be done not in the ordered >>>> way ? could this ever happen ? >>>> >>>> Le lun. 24 janv. 2022 à 16:14, Sean Owen a écrit : >>>> >>>>> Hadoop does not run Spark programs, Spark does. How or why would >>>>> something, what, modify the byte code? No >>>>> >>>>> On Mon, Jan 24, 2022, 9:07 AM sam smith >>>>> wrote: >>>>> >>>>>> My point is could Hadoop go wrong about one Spark execution ? meaning >>>>>> that it gets confused (given the concurrent distributed tasks) and then >>>>>> adds wrong instruction to the program, or maybe does execute an >>>>>> instruction >>>>>> not at its right order (shuffling the order of execution by executing >>>>>> previous ones, while it shouldn't) ? Before finishing and returning the >>>>>> results from one node it returns the results of the other in a wrong way >>>>>> for example. >>>>>> >>>>>> Le lun. 24 janv. 2022 à 15:31, Sean Owen a écrit : >>>>>> >>>>>>> Not clear what you mean here. A Spark program is a program, so what >>>>>>> are the alternatives here? program execution order is still program >>>>>>> execution order. You are not guaranteed anything about order of >>>>>>> concurrent >>>>>>> tasks. Failed tasks can be reexecuted so should be idempotent. I think >>>>>>> the >>>>>>> answer is 'no' but not sure what you are thinking of here. >>>>>>> >>>>>>> On Mon, Jan 24, 2022 at 7:10 AM sam smith < >>>>>>> qustacksm2123...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello guys, >>>>>>>> >>>>>>>> I hope my question does not sound weird, but could a Spark >>>>>>>> execution on Hadoop cluster give different output than the program >>>>>>>> actually >>>>>>>> does ? I mean by that, the execution order is messed by hadoop, or an >>>>>>>> instruction executed twice..; ? >>>>>>>> >>>>>>>> Thanks for your enlightenment >>>>>>>> >>>>>>>
Re: Spark execution on Hadoop cluster (many nodes)
I mean the DAG order is somehow altered when executing on Hadoop Le lun. 24 janv. 2022 à 17:17, Sean Owen a écrit : > Code is not executed by Hadoop, nor passed through Hadoop somehow. Do you > mean data? data is read as-is. There is typically no guarantee about > ordering of data in files but you can order data. Still not sure what > specifically you are worried about here, but I don't think the kind of > thing you're contemplating can happen, no > > On Mon, Jan 24, 2022 at 9:28 AM sam smith > wrote: > >> I am aware of that, but whenever the chunks of code are returned to Spark >> from Hadoop (after processing) could they be done not in the ordered way ? >> could this ever happen ? >> >> Le lun. 24 janv. 2022 à 16:14, Sean Owen a écrit : >> >>> Hadoop does not run Spark programs, Spark does. How or why would >>> something, what, modify the byte code? No >>> >>> On Mon, Jan 24, 2022, 9:07 AM sam smith >>> wrote: >>> >>>> My point is could Hadoop go wrong about one Spark execution ? meaning >>>> that it gets confused (given the concurrent distributed tasks) and then >>>> adds wrong instruction to the program, or maybe does execute an instruction >>>> not at its right order (shuffling the order of execution by executing >>>> previous ones, while it shouldn't) ? Before finishing and returning the >>>> results from one node it returns the results of the other in a wrong way >>>> for example. >>>> >>>> Le lun. 24 janv. 2022 à 15:31, Sean Owen a écrit : >>>> >>>>> Not clear what you mean here. A Spark program is a program, so what >>>>> are the alternatives here? program execution order is still program >>>>> execution order. You are not guaranteed anything about order of concurrent >>>>> tasks. Failed tasks can be reexecuted so should be idempotent. I think the >>>>> answer is 'no' but not sure what you are thinking of here. >>>>> >>>>> On Mon, Jan 24, 2022 at 7:10 AM sam smith >>>>> wrote: >>>>> >>>>>> Hello guys, >>>>>> >>>>>> I hope my question does not sound weird, but could a Spark execution >>>>>> on Hadoop cluster give different output than the program actually does ? >>>>>> I >>>>>> mean by that, the execution order is messed by hadoop, or an instruction >>>>>> executed twice..; ? >>>>>> >>>>>> Thanks for your enlightenment >>>>>> >>>>>
Re: Spark execution on Hadoop cluster (many nodes)
I am aware of that, but whenever the chunks of code are returned to Spark from Hadoop (after processing) could they be done not in the ordered way ? could this ever happen ? Le lun. 24 janv. 2022 à 16:14, Sean Owen a écrit : > Hadoop does not run Spark programs, Spark does. How or why would > something, what, modify the byte code? No > > On Mon, Jan 24, 2022, 9:07 AM sam smith > wrote: > >> My point is could Hadoop go wrong about one Spark execution ? meaning >> that it gets confused (given the concurrent distributed tasks) and then >> adds wrong instruction to the program, or maybe does execute an instruction >> not at its right order (shuffling the order of execution by executing >> previous ones, while it shouldn't) ? Before finishing and returning the >> results from one node it returns the results of the other in a wrong way >> for example. >> >> Le lun. 24 janv. 2022 à 15:31, Sean Owen a écrit : >> >>> Not clear what you mean here. A Spark program is a program, so what are >>> the alternatives here? program execution order is still program execution >>> order. You are not guaranteed anything about order of concurrent tasks. >>> Failed tasks can be reexecuted so should be idempotent. I think the answer >>> is 'no' but not sure what you are thinking of here. >>> >>> On Mon, Jan 24, 2022 at 7:10 AM sam smith >>> wrote: >>> >>>> Hello guys, >>>> >>>> I hope my question does not sound weird, but could a Spark execution on >>>> Hadoop cluster give different output than the program actually does ? I >>>> mean by that, the execution order is messed by hadoop, or an instruction >>>> executed twice..; ? >>>> >>>> Thanks for your enlightenment >>>> >>>
Re: Spark execution on Hadoop cluster (many nodes)
My point is could Hadoop go wrong about one Spark execution ? meaning that it gets confused (given the concurrent distributed tasks) and then adds wrong instruction to the program, or maybe does execute an instruction not at its right order (shuffling the order of execution by executing previous ones, while it shouldn't) ? Before finishing and returning the results from one node it returns the results of the other in a wrong way for example. Le lun. 24 janv. 2022 à 15:31, Sean Owen a écrit : > Not clear what you mean here. A Spark program is a program, so what are > the alternatives here? program execution order is still program execution > order. You are not guaranteed anything about order of concurrent tasks. > Failed tasks can be reexecuted so should be idempotent. I think the answer > is 'no' but not sure what you are thinking of here. > > On Mon, Jan 24, 2022 at 7:10 AM sam smith > wrote: > >> Hello guys, >> >> I hope my question does not sound weird, but could a Spark execution on >> Hadoop cluster give different output than the program actually does ? I >> mean by that, the execution order is messed by hadoop, or an instruction >> executed twice..; ? >> >> Thanks for your enlightenment >> >
Spark execution on Hadoop cluster (many nodes)
Hello guys, I hope my question does not sound weird, but could a Spark execution on Hadoop cluster give different output than the program actually does ? I mean by that, the execution order is messed by hadoop, or an instruction executed twice..; ? Thanks for your enlightenment
Re: About some Spark technical help
Thanks for the feedback Andrew. Le sam. 25 déc. 2021 à 03:17, Andrew Davidson a écrit : > Hi Sam > > It is kind of hard to review straight code. Adding some some sample data, > a unit test and expected results. Would be a good place to start. Ie. > Determine the fidelity of your implementation compared to the original. > > Also a verbal description of the algo would be helpful > > Happy Holidays > > Andy > > On Fri, Dec 24, 2021 at 3:17 AM sam smith > wrote: > >> Hi Gourav, >> >> Good question! that's the programming language i am most proficient at. >> You are always welcome to suggest corrective remarks about my (Spark) >> code. >> >> Kind regards. >> >> Le ven. 24 déc. 2021 à 11:58, Gourav Sengupta >> a écrit : >> >>> Hi, >>> >>> out of sheer and utter curiosity, why JAVA? >>> >>> Regards, >>> Gourav Sengupta >>> >>> On Thu, Dec 23, 2021 at 5:10 PM sam smith >>> wrote: >>> >>>> Hi Andrew, >>>> >>>> Thanks, here's the Github repo to the code and the publication : >>>> https://github.com/SamSmithDevs10/paperReplicationForReview >>>> >>>> Kind regards >>>> >>>> Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson a >>>> écrit : >>>> >>>>> Hi Sam >>>>> >>>>> >>>>> >>>>> Can you tell us more? What is the algorithm? Can you send us the URL >>>>> the publication >>>>> >>>>> >>>>> >>>>> Kind regards >>>>> >>>>> >>>>> >>>>> Andy >>>>> >>>>> >>>>> >>>>> *From: *sam smith >>>>> *Date: *Wednesday, December 22, 2021 at 10:59 AM >>>>> *To: *"user@spark.apache.org" >>>>> *Subject: *About some Spark technical help >>>>> >>>>> >>>>> >>>>> Hello guys, >>>>> >>>>> >>>>> >>>>> I am replicating a paper's algorithm in Spark / Java, and want to ask >>>>> you guys for some assistance to validate / review about 150 lines of code. >>>>> My github repo contains both my java class and the related paper, >>>>> >>>>> >>>>> >>>>> Any interested reviewer here ? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Thanks. >>>>> >>>>
Re: About some Spark technical help
Hi Gourav, Good question! that's the programming language i am most proficient at. You are always welcome to suggest corrective remarks about my (Spark) code. Kind regards. Le ven. 24 déc. 2021 à 11:58, Gourav Sengupta a écrit : > Hi, > > out of sheer and utter curiosity, why JAVA? > > Regards, > Gourav Sengupta > > On Thu, Dec 23, 2021 at 5:10 PM sam smith > wrote: > >> Hi Andrew, >> >> Thanks, here's the Github repo to the code and the publication : >> https://github.com/SamSmithDevs10/paperReplicationForReview >> >> Kind regards >> >> Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson a >> écrit : >> >>> Hi Sam >>> >>> >>> >>> Can you tell us more? What is the algorithm? Can you send us the URL the >>> publication >>> >>> >>> >>> Kind regards >>> >>> >>> >>> Andy >>> >>> >>> >>> *From: *sam smith >>> *Date: *Wednesday, December 22, 2021 at 10:59 AM >>> *To: *"user@spark.apache.org" >>> *Subject: *About some Spark technical help >>> >>> >>> >>> Hello guys, >>> >>> >>> >>> I am replicating a paper's algorithm in Spark / Java, and want to ask >>> you guys for some assistance to validate / review about 150 lines of code. >>> My github repo contains both my java class and the related paper, >>> >>> >>> >>> Any interested reviewer here ? >>> >>> >>> >>> >>> >>> Thanks. >>> >>
Re: About some Spark technical help
Hi Andrew, Thanks, here's the Github repo to the code and the publication : https://github.com/SamSmithDevs10/paperReplicationForReview Kind regards Le jeu. 23 déc. 2021 à 17:58, Andrew Davidson a écrit : > Hi Sam > > > > Can you tell us more? What is the algorithm? Can you send us the URL the > publication > > > > Kind regards > > > > Andy > > > > *From: *sam smith > *Date: *Wednesday, December 22, 2021 at 10:59 AM > *To: *"user@spark.apache.org" > *Subject: *About some Spark technical help > > > > Hello guys, > > > > I am replicating a paper's algorithm in Spark / Java, and want to ask you > guys for some assistance to validate / review about 150 lines of code. My > github repo contains both my java class and the related paper, > > > > Any interested reviewer here ? > > > > > > Thanks. >
dataset partitioning algorithm implementation help
Hello All, I am replicating a paper's algorithm about a partitioning approach to anonymize datasets with Spark / Java, and want to ask you for some help to review my 150 lines of code. My github repo, attached below, contains both my java class and the related paper: https://github.com/SamSmithDevs10/paperReplicationForReview Thanks in advance. Thanks.
About some Spark technical help
Hello guys, I am replicating a paper's algorithm in Spark / Java, and want to ask you guys for some assistance to validate / review about 150 lines of code. My github repo contains both my java class and the related paper, Any interested reviewer here ? Thanks.
About some Spark technical help
Hello guys, I am replicating a paper's algorithm in Spark / Java, and want to ask you guys for some assistance to validate / review about 150 lines of code. My github repo contains both my java class and the related paper, Any interested reviewer here ? Thanks.
Re: About some Spark technical assistance
you were added to the repo to contribute, thanks. I included the java class and the paper i am replicating Le lun. 13 déc. 2021 à 04:27, a écrit : > github url please. > > On 2021-12-13 01:06, sam smith wrote: > > Hello guys, > > > > I am replicating a paper's algorithm (graph coloring algorithm) in > > Spark under Java, and thought about asking you guys for some > > assistance to validate / review my 600 lines of code. Any volunteers > > to share the code with ? > > Thanks >
About some Spark technical assistance
Hello guys, I am replicating a paper's algorithm (graph coloring algorithm) in Spark under Java, and thought about asking you guys for some assistance to validate / review my 600 lines of code. Any volunteers to share the code with ? Thanks
[no subject]
unsubscribe
Re: Parquet Metadata
Hi, I only know about comments which you can add to each column where you can add these key values. Thanks. On Wed, Jun 23, 2021 at 11:31 AM Bode, Meikel, NMA-CFD < meikel.b...@bertelsmann.de> wrote: > Hi folks, > > > > Maybe not the right audience but maybe you came along such an requirement. > > Is it possible to define a parquet schema, that contains technical column > names and a list of translations for a certain column name into different > languages? > > > > I give an example: > > Technical: “custnr” would translate to { EN:”Customer Number”, DE: > “Kundennummer”} > > > > We could of course deliver a meta data file containing such language > mappings, but our questions is whether we could embed that info into the > parquet meta data? > > > > Thanks a lot, > > Meikel > > >
Re: Distributing a FlatMap across a Spark Cluster
Like I said In my previous email, can you try this and let me know how many tasks you see? val repRdd = scoredRdd.repartition(50).cache() repRdd.take(1) Then map operation on repRdd here. I’ve done similar map operations in the past and this works. Thanks. On Wed, Jun 9, 2021 at 11:17 AM Tom Barber wrote: > Also just to follow up on that slightly, I did also try off the back of > another comment: > > def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { > val job = this.job.asInstanceOf[SparklerJob] > > val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) > > val scoreUpdateRdd: RDD[SolrInputDocument] = > scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d)) > > > Where I repartitioned that scoredRdd map out of interest, it then triggers > the FairFetcher function there, instead of in the runJob(), but still on a > single executor > > Tom > > On Wed, Jun 9, 2021 at 4:11 PM Tom Barber wrote: > >> >> Okay so what happens is that the crawler reads a bunch of solr data, >> we're not talking GB's just a list of JSON and turns that into a bunch of >> RDD's that end up in that flatmap that I linked to first. >> >> The fair fetcher is an interface to a pluggable backend that basically >> takes some of the fields and goes and crawls websites listed in them >> looking for information. We wrote this code 6 years ago for a DARPA project >> tracking down criminals on the web. Now I'm reusing it but trying to force >> it to scale out a bit more. >> >> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want >> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel >> on one node makes my cluster sad) to each executor and have it run a crawl, >> then move on and get another one and so on. That way you're not saturating >> a node trying to look up all of them and you could add more nodes for >> greater capacity pretty quickly. Once the website has been captured, you >> can then "score" it for want of a better term to determine its usefulness, >> which is where the map is being triggered. >> >> In answer to your questions Sean, no action seems triggered until you end >> up in the score block and the sc.runJob() because thats literally the next >> line of functionality as Kafka isn't enabled. >> >> val fetchedRdd = rdd.map(r => (r.getGroup, r)) >> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, >> rs.iterator, localFetchDelay, >> FetchFunction, ParseFunction, OutLinkFilterFunction, >> StatusUpdateSolrTransformer).toSeq }) >> .persist() >> >> if (kafkaEnable) { >> storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd) >> } >> val scoredRdd = score(fetchedRdd) >> >> >> That if block is disabled so the score function runs. Inside of that: >> >> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >> val job = this.job.asInstanceOf[SparklerJob] >> >> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >> >> val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => >> ScoreUpdateSolrTransformer(d)) >> val scoreUpdateFunc = new SolrStatusUpdate(job) >> sc.runJob(scoreUpdateRdd, scoreUpdateFunc) >> >> >> >> When its doing stuff in the SparkUI I can see that its waiting on the >> sc.runJob() line, so thats the execution point. >> >> >> Tom >> >> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen wrote: >> >>> persist() doesn't even persist by itself - just sets it to be persisted >>> when it's executed. >>> key doesn't matter here, nor partitioning, if this code is trying to run >>> things on the driver inadvertently. >>> I don't quite grok what the OSS code you linked to is doing, but it's >>> running some supplied functions very directly and at a low-level with >>> sc.runJob, which might be part of how this can do something unusual. >>> How do you trigger any action? what happens after persist() >>> >>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber wrote: >>> Thanks Mich, The key on the first iteration is just a string that says "seed", so it is indeed on the first crawl the same across all of the groups. Further iterations would be different, but I'm not there yet. I was under the impression that a repartition would distribute the tasks. Is that not the case? Thanks Tom On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh < mich.talebza...@gmail.com> wrote: > Hi Tom, > > Persist() here simply means persist to memory). That is all. You can > check UI tab on storage > > > https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence > > So I gather the code is stuck from your link in the driver. You stated > that you tried repartition() but it did not do anything, > > Further you stated : > > " The key is pretty static in these tests, so I have also tried > forcing the partition count (50 on a 16 core per node cluster) and also >
Re: REST Structured Steaming Sink
Hi Folks, Great discussion! I will take into account rate-limiting and make it configurable for the http request as well as all I was wondering if there is anything I might have missed that would make it technically impossible to do or at least difficult enough to not warrant the effort Is there anything I might have overlooked? Also, would this be useful to people? My idea is from a business perspective, why are we making them wait till the next scheduled batch run for data that is already available from an API. You could run a job every minute/hour but that in itself sounds like a streaming use-case Thoughts? Regards Sam On Thu, Jul 2, 2020 at 3:31 AM Burak Yavuz wrote: > Well, the difference is, a technical user writes the UDF and a > non-technical user may use this built-in thing (misconfigure it) and shoot > themselves in the foot. > > On Wed, Jul 1, 2020, 6:40 PM Andrew Melo wrote: > >> On Wed, Jul 1, 2020 at 8:13 PM Burak Yavuz wrote: >> > >> > I'm not sure having a built-in sink that allows you to DDOS servers is >> the best idea either. foreachWriter is typically used for such use cases, >> not foreachBatch. It's also pretty hard to guarantee exactly-once, rate >> limiting, etc. >> >> If you control the machines and can run arbitrary code, you can DDOS >> whatever you want. What's the difference between this proposal and >> writing a UDF that opens 1,000 connections to a target machine? >> >> > Best, >> > Burak >> > >> > On Wed, Jul 1, 2020 at 5:54 PM Holden Karau >> wrote: >> >> >> >> I think adding something like this (if it doesn't already exist) could >> help make structured streaming easier to use, foreachBatch is not the best >> API. >> >> >> >> On Wed, Jul 1, 2020 at 2:21 PM Jungtaek Lim < >> kabhwan.opensou...@gmail.com> wrote: >> >>> >> >>> I guess the method, query parameter, header, and the payload would be >> all different for almost every use case - that makes it hard to generalize >> and requires implementation to be pretty much complicated to be flexible >> enough. >> >>> >> >>> I'm not aware of any custom sink implementing REST so your best bet >> would be simply implementing your own with foreachBatch, but so someone >> might jump in and provide a pointer if there is something in the Spark >> ecosystem. >> >>> >> >>> Thanks, >> >>> Jungtaek Lim (HeartSaVioR) >> >>> >> >>> On Thu, Jul 2, 2020 at 3:21 AM Sam Elamin >> wrote: >> >>>> >> >>>> Hi All, >> >>>> >> >>>> >> >>>> We ingest alot of restful APIs into our lake and I'm wondering if it >> is at all possible to created a rest sink in structured streaming? >> >>>> >> >>>> For now I'm only focusing on restful services that have an >> incremental ID so my sink can just poll for new data then ingest. >> >>>> >> >>>> I can't seem to find a connector that does this and my gut instinct >> tells me it's probably because it isn't possible due to something >> completely obvious that I am missing >> >>>> >> >>>> I know some RESTful API obfuscate the IDs to a hash of strings and >> that could be a problem but since I'm planning on focusing on just >> numerical IDs that just get incremented I think I won't be facing that issue >> >>>> >> >>>> >> >>>> Can anyone let me know if this sounds like a daft idea? Will I need >> something like Kafka or kinesis as a buffer and redundancy or am I >> overthinking this? >> >>>> >> >>>> >> >>>> I would love to bounce ideas with people who runs structured >> streaming jobs in production >> >>>> >> >>>> >> >>>> Kind regards >> >>>> San >> >>>> >> >>>> >> >> >> >> >> >> -- >> >> Twitter: https://twitter.com/holdenkarau >> >> Books (Learning Spark, High Performance Spark, etc.): >> https://amzn.to/2MaRAG9 >> >> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >> >
REST Structured Steaming Sink
Hi All, We ingest alot of restful APIs into our lake and I'm wondering if it is at all possible to created a rest sink in structured streaming? For now I'm only focusing on restful services that have an incremental ID so my sink can just poll for new data then ingest. I can't seem to find a connector that does this and my gut instinct tells me it's probably because it isn't possible due to something completely obvious that I am missing I know some RESTful API obfuscate the IDs to a hash of strings and that could be a problem but since I'm planning on focusing on just numerical IDs that just get incremented I think I won't be facing that issue Can anyone let me know if this sounds like a daft idea? Will I need something like Kafka or kinesis as a buffer and redundancy or am I overthinking this? I would love to bounce ideas with people who runs structured streaming jobs in production Kind regards San
Avro file question
Hi, How do we choose between single large avro file (size much larger than HDFS block size) vs multiple smaller avro files (close to HDFS block size? Since avro is splittable, is there even a need to split a very large avro file into smaller files? I’m assuming that a single large avro file can also be split into multiple mappers/reducers/executors during processing. Thanks.
Re: Spark Scala reading from Google Cloud BigQuery table throws error
Hi Mich I wrote a connector to make it easier to connect Bigquery and Spark Have a look here https://github.com/samelamin/spark-bigquery/ Your feedback is always welcome Kind Regards Sam On Tue, Dec 18, 2018 at 7:46 PM Mich Talebzadeh wrote: > Thanks Jorn. I will try that. Requires installing sbt etc on ephemeral > compute server in Google Cloud to built an uber jar file. > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Tue, 18 Dec 2018 at 11:16, Jörn Franke wrote: > >> Maybe the guava version in your spark lib folder is not compatible (if >> your Spark version has a guava library)? In this case i propose to create a >> fat/uber jar potentially with a shaded guava dependency. >> >> Am 18.12.2018 um 11:26 schrieb Mich Talebzadeh > >: >> >> Hi, >> >> I am writing a small test code in spark-shell with attached jar >> dependencies >> >> spark-shell --jars >> /home/hduser/jars/bigquery-connector-0.13.4-hadoop3.jar,/home/hduser/jars/gcs-connector-1.9.4-hadoop3.jar,/home/hduser/jars/other/guava-19.0.jar,/home/hduser/jars/google-api-client-1.4.1-beta.jar,/home/hduser/jars/google-api-client-json-1.2.3-alpha.jar,/home/hduser/jars/google-api-services-bigquery-v2-rev20181202-1.27.0.jar >> >> to read an already existing table in Google BigQuery as follows: >> >> import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration >> import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat >> import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat >> import >> com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration >> import >> com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat >> import com.google.gson.JsonObject >> import org.apache.hadoop.io.LongWritable >> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat >> // Assumes you have a spark context (sc) -- running from spark-shell REPL. >> // Marked as transient since configuration is not Serializable. This >> should >> // only be necessary in spark-shell REPL. >> @transient >> val conf = sc.hadoopConfiguration >> // Input parameters. >> val fullyQualifiedInputTableId = "axial-glow-224522.accounts.ll_18740868" >> val projectId = conf.get("fs.gs.project.id") >> val bucket = conf.get("fs.gs.system.bucket") >> // Input configuration. >> conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId) >> conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket) >> BigQueryConfiguration.configureBigQueryInput(conf, >> fullyQualifiedInputTableId) >> >> The problem I have is that even after loading jars with spark-shell --jar >> >> I am getting the following error at the last line >> >> scala> BigQueryConfiguration.configureBigQueryInput(conf, >> fullyQualifiedInputTableId) >> >> java.lang.NoSuchMethodError: >> com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V >> at >> com.google.cloud.hadoop.io.bigquery.BigQueryStrings.parseTableReference(BigQueryStrings.java:68) >> at >> com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(BigQueryConfiguration.java:260) >> ... 49 elided >> >> It says it cannot find method >> >> java.lang.NoSuchMethodError: >> com.google.common.base.Preconditions.checkArgument >> >> but I checked it and it is in the following jar file >> >> jar tvf guava-19.0.jar| grep common.base.Preconditions >> 5249 Wed Dec 09 15:58:14 UTC 2015 >> com/google/common/base/Preconditions.class >> >> I have used different version of guava jar files but none works! >> >> The code is based on the following: >> >> >> https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example >> >> Thanks >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >>
Why is the max iteration for svd not configurable in mllib?
https://github.com/apache/spark/blob/f5aba657396bd4e2e03dd06491a2d169a99592a7/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L191 maxIter is set to max(300, 3 * # singular values). Is there a particular reason for this? And if not, would it be appropriate to submit a PR exposing that parameter? I have not contributed to spark before, so I don’t know if a small api change like that would require a discussion beforehand. Thanks! Sam
Re: from_json()
Hi jg, Perhaps I am misunderstanding you, but if you just want to create a new schema from a df its fairly simple, assuming you have a schema already predefined or in a string. i.e. val newSchema = DataType.fromJson(json_schema_string) then all you need to do is re-create the dataframe using this new dataframe sqlContext.createDataFrame(oldDF.rdd,newSchema) Regards Sam On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin <jper...@lumeris.com> wrote: > Is there a way to not have to specify a schema when using from_json() or > infer the schema? When you read a JSON doc from disk, you can infer the > schema. Should I write it to disk before (ouch)? > > > > jg > -- > > This electronic transmission and any documents accompanying this > electronic transmission contain confidential information belonging to the > sender. This information may contain confidential health information that > is legally privileged. The information is intended only for the use of the > individual or entity named above. The authorized recipient of this > transmission is prohibited from disclosing this information to any other > party unless required to do so by law or regulation and is required to > delete or destroy the information after its stated need has been fulfilled. > If you are not the intended recipient, you are hereby notified that any > disclosure, copying, distribution or the taking of any action in reliance > on or regarding the contents of this electronically transmitted information > is strictly prohibited. If you have received this E-mail in error, please > notify the sender and delete this message immediately. >
Re: [ANNOUNCE] Announcing Apache Spark 2.2.0
Well done! This is amazing news :) Congrats and really cant wait to spread the structured streaming love! On Mon, Jul 17, 2017 at 5:25 PM, kant kodaliwrote: > +1 > > On Tue, Jul 11, 2017 at 3:56 PM, Jean Georges Perrin wrote: > >> Awesome! Congrats! Can't wait!! >> >> jg >> >> >> On Jul 11, 2017, at 18:48, Michael Armbrust >> wrote: >> >> Hi all, >> >> Apache Spark 2.2.0 is the third release of the Spark 2.x line. This >> release removes the experimental tag from Structured Streaming. In >> addition, this release focuses on usability, stability, and polish, >> resolving over 1100 tickets. >> >> We'd like to thank our contributors and users for their contributions and >> early feedback to this release. This release would not have been possible >> without you. >> >> To download Spark 2.2.0, head over to the download page: >> http://spark.apache.org/downloads.html >> >> To view the release notes: https://spark.apache.or >> g/releases/spark-release-2-2-0.html >> >> *(note: If you see any issues with the release notes, webpage or >> published artifacts, please contact me directly off-list) * >> >> Michael >> >> >
Re: UDAFs for sketching Dataset columns with T-Digests
This is interesting and very useful. Thanks. On Thu, Jul 6, 2017 at 2:33 AM, Erik Erlandsonwrote: > After my talk on T-Digests in Spark at Spark Summit East, there were some > requests for a UDAF-based interface for working with Datasets. I'm > pleased to announce that I released a library for doing T-Digest sketching > with UDAFs: > > https://github.com/isarn/isarn-sketches-spark > > This initial release provides support for Scala. Future releases will > support PySpark bindings, and additional tools for leveraging T-Digests in > ML pipelines. > > Cheers! > Erik >
Re: Restful API Spark Application
Hi Nipun Have you checked out the job servwr https://github.com/spark-jobserver/spark-jobserver Regards Sam On Fri, 12 May 2017 at 21:00, Nipun Arora <nipunarora2...@gmail.com> wrote: > Hi, > > We have written a java spark application (primarily uses spark sql). We > want to expand this to provide our application "as a service". For this, we > are trying to write a REST API. While a simple REST API can be easily made, > and I can get Spark to run through the launcher. I wonder, how the spark > context can be used by service requests, to process data. > > Are there any simple JAVA examples to illustrate this use-case? I am sure > people have faced this before. > > > Thanks > Nipun >
Re: Spark Testing Library Discussion
Hi lucas Thanks for the detailed feedback, that's really useful! I did suggest Github but my colleague asked for an email You raise a good point with the grammar, sure I will rephrase it. I am more than happy to merge in the PR if you send it Th at said I know you can make BDD tests using any framework but I am a lazy developer and would rather use the framework or library defaults to make it easier for other devs to pick up. The number of rows is only a start correct, we can add more tests to check the transformed version but I was going to point that out on the future part of the series since this one is mainly about raw extracts. Thank you very much for the feedback and I will be sure to add it once I have more feedback Maybe we can create a gist of all this or even a tiny book on best practices if people find it useful Looking forward to the PR! Regards Sam On Sat, 29 Apr 2017 at 06:36, lucas.g...@gmail.com <lucas.g...@gmail.com> wrote: > Awesome, thanks. > > Just reading your post > > A few observations: > 1) You're giving out Marius's email: "I have been lucky enough to > build this pipeline with the amazing Marius Feteanu". A linked or > github link might be more helpful. > > 2) "If you are in Pyspark world sadly Holden’s test base wont work so > I suggest you check out Pytest and pytest-bdd.". doesn't read well to > me, on first read I was wondering if Spark-Test-Base wasn't available > in python... It took me about 20 seconds to figure out that you > probably meant it doesn't allow for direct BDD semantics. My 2nd > observation here is that BDD semantics can be aped in any given > testing framework. You just need to be flexible :) > > 3) You're doing a transformation (IE JSON input against a JSON > schema). You are testing for # of rows which is a good start. But I > don't think that really exercises a test against your JSON schema. I > tend to view schema as the things that need the most rigorous testing > (it's code after all). IE I would want to confirm that the output > matches the expected shape and values after being loaded against the > schema. > > I saw a few minor spelling and grammatical issues as well. I put a PR > into your blog for them. I won't be offended if you squish it :) > > I should be getting into our testing 'how-to' stuff this week. I'll > scrape our org specific stuff and put it up to github this week as > well. It'll be in python so maybe we'll get both use cases covered > with examples :) > > G > > On 27 April 2017 at 03:46, Sam Elamin <hussam.ela...@gmail.com> wrote: > > Hi > > > > @Lucas I certainly would love to write an integration testing library for > > workflows, I have a few ideas I would love to share with others and they > are > > focused around Airflow since that is what we use > > > > > > As promised here is the first blog post in a series of posts I hope to > write > > on how we build data pipelines > > > > Please feel free to retweet my original tweet and share because the more > > ideas we have the better! > > > > Feedback is always welcome! > > > > Regards > > Sam > > > > On Tue, Apr 25, 2017 at 10:32 PM, lucas.g...@gmail.com > > <lucas.g...@gmail.com> wrote: > >> > >> Hi all, whoever (Sam I think) was going to do some work on doing a > >> template testing pipeline. I'd love to be involved, I have a current > task > >> in my day job (data engineer) to flesh out our testing how-to / best > >> practices for Spark jobs and I think I'll be doing something very > similar > >> for the next week or 2. > >> > >> I'll scrape out what i have now in the next day or so and put it up in a > >> gist that I can share too. > >> > >> G > >> > >> On 25 April 2017 at 13:04, Holden Karau <hol...@pigscanfly.ca> wrote: > >>> > >>> Urgh hangouts did something frustrating, updated link > >>> https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe > >>> > >>> On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau <hol...@pigscanfly.ca> > >>> wrote: > >>>> > >>>> The (tentative) link for those interested is > >>>> https://hangouts.google.com/hangouts/_/oyjvcnffejcjhi6qazf3lysypue . > >>>> > >>>> On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau <hol...@pigscanfly.ca> > >>>> wrote: > >>>>> > >>>>> So 14 people have said they are available on Tuesday the 25th at 1PM > >>>>> pacific so we will do this meeting then ( > >>>>
Re: Spark Testing Library Discussion
Hi @Lucas I certainly would love to write an integration testing library for workflows, I have a few ideas I would love to share with others and they are focused around Airflow since that is what we use As promised here <https://samelamin.github.io/2017/04/27/Building-A-Datapipeline-part1/> is the first blog post in a series of posts I hope to write on how we build data pipelines Please feel free to retweet my original tweet <https://twitter.com/samelamin/status/857546231492612096> and share because the more ideas we have the better! Feedback is always welcome! Regards Sam On Tue, Apr 25, 2017 at 10:32 PM, lucas.g...@gmail.com <lucas.g...@gmail.com > wrote: > Hi all, whoever (Sam I think) was going to do some work on doing a > template testing pipeline. I'd love to be involved, I have a current task > in my day job (data engineer) to flesh out our testing how-to / best > practices for Spark jobs and I think I'll be doing something very similar > for the next week or 2. > > I'll scrape out what i have now in the next day or so and put it up in a > gist that I can share too. > > G > > On 25 April 2017 at 13:04, Holden Karau <hol...@pigscanfly.ca> wrote: > >> Urgh hangouts did something frustrating, updated link >> https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe >> >> On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau <hol...@pigscanfly.ca> >> wrote: >> >>> The (tentative) link for those interested is https://hangouts.google.com >>> /hangouts/_/oyjvcnffejcjhi6qazf3lysypue . >>> >>> On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau <hol...@pigscanfly.ca> >>> wrote: >>> >>>> So 14 people have said they are available on Tuesday the 25th at 1PM >>>> pacific so we will do this meeting then ( https://doodle.com/poll/69y6 >>>> yab4pyf7u8bn ). >>>> >>>> Since hangouts tends to work ok on the Linux distro I'm running my >>>> default is to host this as a "hangouts-on-air" unless there are alternative >>>> ideas. >>>> >>>> I'll record the hangout and if it isn't terrible I'll post it for those >>>> who weren't able to make it (and for next time I'll include more European >>>> friendly time options - Doodle wouldn't let me update it once posted). >>>> >>>> On Fri, Apr 14, 2017 at 11:17 AM, Holden Karau <hol...@pigscanfly.ca> >>>> wrote: >>>> >>>>> Hi Spark Users (+ Some Spark Testing Devs on BCC), >>>>> >>>>> Awhile back on one of the many threads about testing in Spark there >>>>> was some interest in having a chat about the state of Spark testing and >>>>> what people want/need. >>>>> >>>>> So if you are interested in joining an online (with maybe an IRL >>>>> component if enough people are SF based) chat about Spark testing please >>>>> fill out this doodle - https://doodle.com/poll/69y6yab4pyf7u8bn >>>>> >>>>> I think reasonable topics of discussion could be: >>>>> >>>>> 1) What is the state of the different Spark testing libraries in the >>>>> different core (Scala, Python, R, Java) and extended languages (C#, >>>>> Javascript, etc.)? >>>>> 2) How do we make these more easily discovered by users? >>>>> 3) What are people looking for in their testing libraries that we are >>>>> missing? (can be functionality, documentation, etc.) >>>>> 4) Are there any examples of well tested open source Spark projects >>>>> and where are they? >>>>> >>>>> If you have other topics that's awesome. >>>>> >>>>> To clarify this about libraries and best practices for people testing >>>>> their Spark applications, and less about testing Spark's internals >>>>> (although as illustrated by some of the libraries there is some strong >>>>> overlap in what is required to make that work). >>>>> >>>>> Cheers, >>>>> >>>>> Holden :) >>>>> >>>>> -- >>>>> Cell : 425-233-8271 <(425)%20233-8271> >>>>> Twitter: https://twitter.com/holdenkarau >>>>> >>>> >>>> >>>> >>>> -- >>>> Cell : 425-233-8271 <(425)%20233-8271> >>>> Twitter: https://twitter.com/holdenkarau >>>> >>> >>> >>> >>> -- >>> Cell : 425-233-8271 <(425)%20233-8271> >>> Twitter: https://twitter.com/holdenkarau >>> >> >> >> >> -- >> Cell : 425-233-8271 <(425)%20233-8271> >> Twitter: https://twitter.com/holdenkarau >> > >
Re: help/suggestions to setup spark cluster
Hi Anna There are a variety of options for launching spark clusters. I doubt people run spark in a. Single EC2 instance, certainly not in production I don't think I don't have enough information of what you are trying to do but if you are just trying to set things up from scratch then I think you can just use EMR which will create a cluster for you and attach a zeppelin instance as well You can also use databricks for ease of use and very little management but you will pay a premium for that abstraction Regards Sam On Wed, 26 Apr 2017 at 22:02, anna stax <annasta...@gmail.com> wrote: > I need to setup a spark cluster for Spark streaming and scheduled batch > jobs and adhoc queries. > Please give me some suggestions. Can this be done in standalone mode. > > Right now we have a spark cluster in standalone mode on AWS EC2 running > spark streaming application. Can we run spark batch jobs and zeppelin on > the same. Do we need a better resource manager like Mesos? > > Are there any companies or individuals that can help in setting this up? > > Thank you. > > -Anna >
Re: How to convert Dstream of JsonObject to Dataframe in spark 2.1.0?
you have 2 options 1 )Clean ->Write your own parser to through each property and create a dataset 2) Hacky but simple -> Convert to json string then read in using spark.read.json(jsonString) Please bear in mind the second option is expensive which is why it is hacky I wrote my own parser here <https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/converters/SchemaConverters.scala> which you can use to convert between JsonObjects to StructType schemas Regards Sam On Sun, Apr 23, 2017 at 7:50 PM, kant kodali <kanth...@gmail.com> wrote: > Hi All, > > How to convert Dstream of JsonObject to Dataframe in spark 2.1.0? That > JsonObject is from Gson Library. > > Thanks! >
Deploying Spark Applications. Best Practices And Patterns
Hi All, Really useful information on this thread. We moved a bit off topic since the initial question was how to schedule spark jobs in AWS. I do think however that there are loads of great insights here within the community so I have renamed the subject to "Deploying Spark Applications. Best Practices" I honestly think is is a great opportunity to share knowledge and views of what best practices are and hopefully lead to people building and shipping reliable, scalable data pipelines. Also can I reiterate what Steve mention, can we all remember this mailing list is aimed to help people so can we please be constructive with our feedback :) Here is my 2 cents: So as someone who has come from a web development background and transitioned into the data space, I can assure you that I completely agree with Steve in that the need for consistent, repeatable deployments are essential for shipping software reliably. it is the very essence of Continuous Deployment As a side note the difference between continuous deployment and continuous delivery is that switch to turn the lights on. You can continuously deploy to an environment - but it is a business decision if you want to flick that switch to make that feature available to your customer. Basically canary releases AKA dark releases I have seen my fair share of "cowboy" shops where the idea of deploying your application is a manual copy/paste by a human and that is absolutely the worst way to be deploying things, when humans are involved in any step of your pipeline things can and inevitably will go wrong. This is exactly why CI tools have been brought into play, to allow integration of all your code across teams/branches and ensure code compiles and tests pass regardless whether this code is meant to do, whether its a website, application, library or framework is irrelevant. Not everyone is going to agree with this, but im my humble opinion "big data" products are in its infancy, the traditional ETL scripts consisted of bespoke python scripts that at most did simple transformations. There is no right way and there certainly isn't an industry standard as of yet. there are definitely loads of wrong ways and I am sure we have all seen/done our fair share of "horror"" stories as Steve eloquently put it. Tools like Talend and Pentaho came into play to try and simplify this process, the UI just said point me to a database, click click click and you have a pipeline in place. When it comes to scheduling Spark jobs, you can either submit to an already running cluster using things like Oozie or bash scripts, or have a workflow manager like Airflow or Data Pipeline to create new clusters for you. We went down the second route to continue with the whole immutable infrastructure/ "treat you're servers as cattle not pets" We are facing two problems for this at the moment 1) Testing and versioning of data in Spark applications: We solved this by using Holden's Spark test base which works amazingly but the nature of data warehousing means people want ALL the columns available so you have to go against your nature as a engineer to keep things simple, the mentality of an analyst of a data scientist is to throw the kitchen sink in, literally any data available should be in the end transformed table, this basically means you either do not test the generated data or your code becomes super verbose and coupled making a nightmare to maintain which defeats the purpose of testing in the first place. Not to mention the nuances of the data sources coming in, eg. data arriving in the wrong shape, wrong order or wrong format or in some cases not at all. You need to test for all of that and deal with it or you will get burnt in production. You do not want to be in that call when your stake holders are asking why their reports are not updated or worse are showing no data! 2) Testing and deploying the workflow manager: We needed to ensure deployments were easy, we were basically masochists here, i.e. if deployment is painful then do it more often until it isnt. The problem is there isnt a clean way to test airflow other than running the DAGs themselves, so we had to parameterise them to push test data through our pipeline and ensure that the transformed tables were generated correctly (simple s3 lookup for us). We are still early days here so happy to hear on feedback on how to do it better I realise this is a very very long email and would probably be better explained on a blog post, but hey this is the gist of it. If people are still interested I can write it up as a blog post adding code samples and nice diagrams! Kind Regards Sam On Wed, Apr 12, 2017 at 7:33 PM, lucas.g...@gmail.com <lucas.g...@gmail.com> wrote: > "Building data products is a very different discipline from that of > building software." > > That is a fundamentally incorrect assumption. > > There will alwa
Re: Spark Streaming. Real-time save data and visualize on dashboard
Hi To be honest there are a variety of options but it all comes down to who will be querying these dashboards. If the end user is an engineer then the ELK stack is fine and I can attest to the ease of use of kibana since I used it quite heavily. On the other hand in my experience it isnt the engineers that are in charge of reporting so if the end user is a data analyst or data scientist then they are most comfortable using SQL and would be slightly aversed to learning the nuances of creating dashboards and using elastic search. Trust me no matter how much you try, these folks are more comfortable using sql and Tableau like platforms. So you will have to educate them, Not to mention the fact that any new hire will have to undergo the same training to be productive My suggestion for that is to push your data to Google BigQuery <https://cloud.google.com/bigquery/>. It really is simple to use and people can just focus on writing their queries. It also returns within seconds for queries over terabytes of data. The caveat here is that you are paying per query. But it's $5 for 1 TB which is peanuts really. Its a managed service so there is zero setup costs and management compared to the other services. I suppose in the end you are paying to abstract that knowledge away Happy to answer any questions you might have Kind Regards Sam On Wed, 12 Apr 2017 at 09:36, tencas <diego...@gmail.com> wrote: > Hi Gaurav1809 , > > I was thinking about using elasticsearch + kibana too (actually don't know > the differences between ELK and elasticsearch). > I was wondering about pros and cons of using a document indexer vs NoSQL > database. > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Spark-Streaming-Real-time-save-data- > and-visualize-on-dashboard-tp28587p28589.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: optimising storage and ec2 instances
Hi Zeming Yu, Steve Just to add, we are also going down partitioning using this route but you should know if you are in AWS land, you are most likely going to use EMRs at any given time At the moment EMRs does not do recursive search on wildcards, see this <http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-troubleshoot-errors-io.html#recurseinput> However Spark seems to be able to deal with it fine, so if you dont have a data serving layer to your customers then you should be fine Regards sam On Tue, Apr 11, 2017 at 1:21 PM, Zeming Yu <zemin...@gmail.com> wrote: > everything works best if your sources are a few tens to hundreds of MB or > more > > Are you referring to the size of the zip file or individual unzipped files? > > Any issues with storing a 60 mb zipped file containing heaps of text files > inside? > > On 11 Apr. 2017 9:09 pm, "Steve Loughran" <ste...@hortonworks.com> wrote: > >> >> > On 11 Apr 2017, at 11:07, Zeming Yu <zemin...@gmail.com> wrote: >> > >> > Hi all, >> > >> > I'm a beginner with spark, and I'm wondering if someone could provide >> guidance on the following 2 questions I have. >> > >> > Background: I have a data set growing by 6 TB p.a. I plan to use spark >> to read in all the data, manipulate it and build a predictive model on it >> (say GBM) I plan to store the data in S3, and use EMR to launch spark, >> reading in data from S3. >> > >> > 1. Which option is best for storing the data on S3 for the purpose of >> analysing it in EMR spark? >> > Option A: storing the 6TB file as 173 million individual text files >> > Option B: zipping up the above 173 million text files as 240,000 zip >> files >> > Option C: appending the individual text files, so have 240,000 text >> files p.a. >> > Option D: combining the text files even further >> > >> >> everything works best if your sources are a few tens to hundreds of MB or >> more of your data, work can be partitioned up by file. If you use more >> structured formats (avro compressed with snappy, etc), you can throw > 1 >> executor at work inside a file. Structure is handy all round, even if its >> just adding timestamp and provenance columns to each data file. >> >> there's the HAR file format from Hadoop which can merge lots of small >> files into larger ones, allowing work to be scheduled per har file. >> Recommended for HDFS as it hates small files, on S3 you still have limits >> on small files (including throttling of HTTP requests to shards of a >> bucket), but they are less significant. >> >> One thing to be aware is that the s3 clients spark use are very >> inefficient in listing wide directory trees, and Spark not always the best >> at partitioning work because of this. You can accidentally create a very >> inefficient tree structure like datasets/year=2017/month=5/day=10/hour=12/, >> with only one file per hour. Listing and partitioning suffers here, and >> while s3a on Hadoop 2.8 is better here, Spark hasn't yet fully adapted to >> those changes (use of specific API calls). There's also a lot more to be >> done in S3A to handle wildcards in the directory tree much more efficiently >> (HADOOP-13204); needs to address pattens like >> (datasets/year=201?/month=*/day=10) >> without treewalking and without fetching too much data from wildcards near >> the top of the tree. We need to avoid implementing something which works >> well on *my* layouts, but absolutely dies on other people's. As is usual in >> OSS, help welcome; early testing here as critical as coding, so as to >> ensure things will work with your file structures >> >> -Steve >> >> >> > 2. Any recommendations on the EMR set up to analyse the 6TB of data all >> at once and build a GBM, in terms of >> > 1) The type of EC2 instances I need? >> > 2) The number of such instances I need? >> > 3) Rough estimate of cost? >> > >> >> no opinion there >> >> > >> > Thanks so much, >> > Zeming >> > >> >>
Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?
Hi Steve Thanks for the detailed response, I think this problem doesn't have an industry standard solution as of yet and I am sure a lot of people would benefit from the discussion I realise now what you are saying so thanks for clarifying, that said let me try and explain how we approached the problem There are 2 problems you highlighted, the first if moving the code from SCM to prod, and the other is enusiring the data your code uses is correct. (using the latest data from prod) *"how do you get your code from SCM into production?"* We currently have our pipeline being run via airflow, we have our dags in S3, with regards to how we get our code from SCM to production 1) Jenkins build that builds our spark applications and runs tests 2) Once the first build is successful we trigger another build to copy the dags to an s3 folder We then routinely sync this folder to the local airflow dags folder every X amount of mins Re test data *" but what's your strategy for test data: that's always the troublespot."* Our application is using versioning against the data, so we expect the source data to be in a certain version and the output data to also be in a certain version We have a test resources folder that we have following the same convention of versioning - this is the data that our application tests use - to ensure that the data is in the correct format so for example if we have Table X with version 1 that depends on data from Table A and B also version 1, we run our spark application then ensure the transformed table X has the correct columns and row values Then when we have a new version 2 of the source data or adding a new column in Table X (version 2), we generate a new version of the data and ensure the tests are updated That way we ensure any new version of the data has tests against it *"I've never seen any good strategy there short of "throw it at a copy of the production dataset"."* I agree which is why we have a sample of the production data and version the schemas we expect the source and target data to look like. If people are interested I am happy writing a blog about it in the hopes this helps people build more reliable pipelines Kind Regards Sam On Tue, Apr 11, 2017 at 11:31 AM, Steve Loughran <ste...@hortonworks.com> wrote: > > On 7 Apr 2017, at 18:40, Sam Elamin <hussam.ela...@gmail.com> wrote: > > Definitely agree with gourav there. I wouldn't want jenkins to run my work > flow. Seems to me that you would only be using jenkins for its scheduling > capabilities > > > Maybe I was just looking at this differenlty > > Yes you can run tests but you wouldn't want it to run your orchestration > of jobs > > What happens if jenkijs goes down for any particular reason. How do you > have the conversation with your stakeholders that your pipeline is not > working and they don't have data because the build server is going through > an upgrade or going through an upgrade > > > > Well, I wouldn't use it as a replacement for Oozie, but I'd certainly > consider as the pipeline for getting your code out to the cluster, so you > don't have to explain why you just pushed out something broken > > As example, here's Renault's pipeline as discussed last week in Munich > https://flic.kr/p/Tw3Emu > > However to be fair I understand what you are saying Steve if someone is in > a place where you only have access to jenkins and have to go through hoops > to setup:get access to new instances then engineers will do what they > always do, find ways to game the system to get their work done > > > > > This isn't about trying to "Game the system", this is about what makes a > replicable workflow for getting code into production, either at the press > of a button or as part of a scheduled "we push out an update every night, > rerun the deployment tests and then switch over to the new installation" > mech. > > Put differently: how do you get your code from SCM into production? Not > just for CI, but what's your strategy for test data: that's always the > troublespot. Random selection of rows may work, although it will skip the > odd outlier (high-unicode char in what should be a LATIN-1 field, time set > to 0, etc), and for work joining > 1 table, you need rows which join well. > I've never seen any good strategy there short of "throw it at a copy of the > production dataset". > > > -Steve > > > > > > > On Fri, 7 Apr 2017 at 16:17, Gourav Sengupta <gourav.sengu...@gmail.com> > wrote: > >> Hi Steve, >> >> Why would you ever do that? You are suggesting the use of a CI tool as a >> workflow and orchestration engine. >> >> Regards, >> Gourav Sengupta >> >> On Fri, Apr 7, 2017 at
Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?
Definitely agree with gourav there. I wouldn't want jenkins to run my work flow. Seems to me that you would only be using jenkins for its scheduling capabilities Yes you can run tests but you wouldn't want it to run your orchestration of jobs What happens if jenkijs goes down for any particular reason. How do you have the conversation with your stakeholders that your pipeline is not working and they don't have data because the build server is going through an upgrade or going through an upgrade However to be fair I understand what you are saying Steve if someone is in a place where you only have access to jenkins and have to go through hoops to setup:get access to new instances then engineers will do what they always do, find ways to game the system to get their work done On Fri, 7 Apr 2017 at 16:17, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi Steve, > > Why would you ever do that? You are suggesting the use of a CI tool as a > workflow and orchestration engine. > > Regards, > Gourav Sengupta > > On Fri, Apr 7, 2017 at 4:07 PM, Steve Loughran <ste...@hortonworks.com> > wrote: > >> If you have Jenkins set up for some CI workflow, that can do scheduled >> builds and tests. Works well if you can do some build test before even >> submitting it to a remote cluster >> >> On 7 Apr 2017, at 10:15, Sam Elamin <hussam.ela...@gmail.com> wrote: >> >> Hi Shyla >> >> You have multiple options really some of which have been already listed >> but let me try and clarify >> >> Assuming you have a spark application in a jar you have a variety of >> options >> >> You have to have an existing spark cluster that is either running on EMR >> or somewhere else. >> >> *Super simple / hacky* >> Cron job on EC2 that calls a simple shell script that does a spart submit >> to a Spark Cluster OR create or add step to an EMR cluster >> >> *More Elegant* >> Airflow/Luigi/AWS Data Pipeline (Which is just CRON in the UI ) that will >> do the above step but have scheduling and potential backfilling and error >> handling(retries,alerts etc) >> >> AWS are coming out with glue <https://aws.amazon.com/glue/> soon that >> does some Spark jobs but I do not think its available worldwide just yet >> >> Hope I cleared things up >> >> Regards >> Sam >> >> >> On Fri, Apr 7, 2017 at 6:05 AM, Gourav Sengupta < >> gourav.sengu...@gmail.com> wrote: >> >>> Hi Shyla, >>> >>> why would you want to schedule a spark job in EC2 instead of EMR? >>> >>> Regards, >>> Gourav >>> >>> On Fri, Apr 7, 2017 at 1:04 AM, shyla deshpande < >>> deshpandesh...@gmail.com> wrote: >>> >>>> I want to run a spark batch job maybe hourly on AWS EC2 . What is the >>>> easiest way to do this. Thanks >>>> >>> >>> >> >> >
Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?
Hi Shyla You have multiple options really some of which have been already listed but let me try and clarify Assuming you have a spark application in a jar you have a variety of options You have to have an existing spark cluster that is either running on EMR or somewhere else. *Super simple / hacky* Cron job on EC2 that calls a simple shell script that does a spart submit to a Spark Cluster OR create or add step to an EMR cluster *More Elegant* Airflow/Luigi/AWS Data Pipeline (Which is just CRON in the UI ) that will do the above step but have scheduling and potential backfilling and error handling(retries,alerts etc) AWS are coming out with glue <https://aws.amazon.com/glue/> soon that does some Spark jobs but I do not think its available worldwide just yet Hope I cleared things up Regards Sam On Fri, Apr 7, 2017 at 6:05 AM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi Shyla, > > why would you want to schedule a spark job in EC2 instead of EMR? > > Regards, > Gourav > > On Fri, Apr 7, 2017 at 1:04 AM, shyla deshpande <deshpandesh...@gmail.com> > wrote: > >> I want to run a spark batch job maybe hourly on AWS EC2 . What is the >> easiest way to do this. Thanks >> > >
Re: Executor unable to pick postgres driver in Spark standalone cluster
Hi Rishikesh, Sounds like the postgres driver isnt being loaded on the path. To try and debug it try submit the application with the --jars e.g. spark-submit {application.jar} --jars /home/ubuntu/downloads/ postgres/postgresql-9.4-1200-jdbc41.jar If that does not work then there is a problem in the application itself and the reason it is working is because you have the dependency in your class path locally Regards Sam On Mon, Apr 3, 2017 at 2:43 PM, Rishikesh Teke <rishikesht...@gmail.com> wrote: > > Hi all, > > I was submitting the play application to spark 2.1 standalone cluster . In > play application postgres dependency is also added and application works on > local spark libraries. But at run time on standalone cluster it gives me > error : > > o.a.s.s.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 1, 172.31.21.3, > executor 1): java.lang.ClassNotFoundException: org.postgresql > .Driver > > I have placed following in spark-defaults.conf directory > > spark.executor.extraClassPath > /home/ubuntu/downloads/postgres/postgresql-9.4-1200-jdbc41.jar > spark.driver.extraClassPath > /home/ubuntu/downloads/postgres/postgresql-9.4-1200-jdbc41.jar > > Still executors unable to pick the driver. > Am i missing something? Need help . > Thanks. > > > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Executor-unable-to-pick-postgres-driver-in-Spark- > standalone-cluster-tp28563.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Contributing to Spark
Hi All, I would like to start contributing to Spark if possible, its an amazing technology and I would love to get involved The contributing page <http://spark.apache.org/contributing.html> states this "consult the list of starter tasks in JIRA, or ask the user@spark.apache.org mailing list." Can anyone guide me on where is best to start? What are these "starter tasks"? I was thinking adding tests would be a good place to begin when dealing with any new code base, perhaps to Pyspark since Scala seems a bit more stable Also - if at all possible - I would really appreciate if any of the contributors or PMC members would be willing to mentor or guide me in this. Any help would be greatly appreciated! Regards Sam
Re: Spark and continuous integration
Thank you both Steve that's a very interesting point. I have to admit I have never thought of doing analysis over time on the tests but it makes sense as the failures over time tell you quite a bit about your data platform Thanks for highlighting! We are using Pyspark for now so I hope some frameworks help with that. Previously we have built data sanity checks that look at counts and numbers to produce graphs using statsd and Grafana (elk stack) but not necessarily looking at test metrics I'll definitely check it out Kind regards Sam On Tue, 14 Mar 2017 at 11:57, Jörn Franke <jornfra...@gmail.com> wrote: > I agree the reporting is an important aspect. Sonarqube (or similar tool) > can report over time, but does not support Scala (well indirectly via > JaCoCo). In the end, you will need to think about a dashboard that displays > results over time. > > On 14 Mar 2017, at 12:44, Steve Loughran <ste...@hortonworks.com> wrote: > > > On 13 Mar 2017, at 13:24, Sam Elamin <hussam.ela...@gmail.com> wrote: > > Hi Jorn > > Thanks for the prompt reply, really we have 2 main concerns with CD, > ensuring tests pasts and linting on the code. > > > I'd add "providing diagnostics when tests fail", which is a combination > of: tests providing useful information and CI tooling collecting all those > results and presenting them meaningfully. The hard parts are invariably (at > least for me) > > -what to do about the intermittent failures > -tradeoff between thorough testing and fast testing, especially when > thorough means "better/larger datasets" > > You can consider the output of jenkins & tests as data sources for your > own analysis too: track failure rates over time, test runs over time, etc: > could be interesting. If you want to go there, then the question of "which > CI toolings produce the most interesting machine-parseable results, above > and beyond the classic Ant-originated XML test run reports" > > I have mixed feelings about scalatest there: I think the expression > language is good, but the maven test runner doesn't report that well, at > least for me: > > > https://steveloughran.blogspot.co.uk/2016/09/scalatest-thoughts-and-ideas.html > > > > I think all platforms should handle this with ease, I was just wondering > what people are using. > > Jenkins seems to have the best spark plugins so we are investigating that > as well as a variety of other hosted CI tools > > Happy to write a blog post detailing our findings and sharing it here if > people are interested > > > Regards > Sam > > On Mon, Mar 13, 2017 at 1:18 PM, Jörn Franke <jornfra...@gmail.com> wrote: > > Hi, > > Jenkins also now supports pipeline as code and multibranch pipelines. thus > you are not so dependent on the UI and you do not need anymore a long list > of jobs for different branches. Additionally it has a new UI (beta) called > blueocean, which is a little bit nicer. You may also check GoCD. Aside from > this you have a huge variety of commercial tools, e.g. Bamboo. > In the cloud, I use for my open source github projects Travis-Ci, but > there are also a lot of alternatives, e.g. Distelli. > > It really depends what you expect, e.g. If you want to Version the build > pipeline in GIT, if you need Docker deployment etc. I am not sure if new > starters should be responsible for the build pipeline, thus I am not sure > that i understand your concern in this area. > > From my experience, integration tests for Spark can be run on any of these > platforms. > > Best regards > > > On 13 Mar 2017, at 10:55, Sam Elamin <hussam.ela...@gmail.com> wrote: > > > > Hi Folks > > > > This is more of a general question. What's everyone using for their CI > /CD when it comes to spark > > > > We are using Pyspark but potentially looking to make to spark scala and > Sbt in the future > > > > > > One of the suggestions was jenkins but I know the UI isn't great for new > starters so I'd rather avoid it. I've used team city but that was more > focused on dot net development > > > > > > What are people using? > > > > Kind Regards > > Sam > > > >
Re: Spark and continuous integration
Hi Jorn Thanks for the prompt reply, really we have 2 main concerns with CD, ensuring tests pasts and linting on the code. I think all platforms should handle this with ease, I was just wondering what people are using. Jenkins seems to have the best spark plugins so we are investigating that as well as a variety of other hosted CI tools Happy to write a blog post detailing our findings and sharing it here if people are interested Regards Sam On Mon, Mar 13, 2017 at 1:18 PM, Jörn Franke <jornfra...@gmail.com> wrote: > Hi, > > Jenkins also now supports pipeline as code and multibranch pipelines. thus > you are not so dependent on the UI and you do not need anymore a long list > of jobs for different branches. Additionally it has a new UI (beta) called > blueocean, which is a little bit nicer. You may also check GoCD. Aside from > this you have a huge variety of commercial tools, e.g. Bamboo. > In the cloud, I use for my open source github projects Travis-Ci, but > there are also a lot of alternatives, e.g. Distelli. > > It really depends what you expect, e.g. If you want to Version the build > pipeline in GIT, if you need Docker deployment etc. I am not sure if new > starters should be responsible for the build pipeline, thus I am not sure > that i understand your concern in this area. > > From my experience, integration tests for Spark can be run on any of these > platforms. > > Best regards > > > On 13 Mar 2017, at 10:55, Sam Elamin <hussam.ela...@gmail.com> wrote: > > > > Hi Folks > > > > This is more of a general question. What's everyone using for their CI > /CD when it comes to spark > > > > We are using Pyspark but potentially looking to make to spark scala and > Sbt in the future > > > > > > One of the suggestions was jenkins but I know the UI isn't great for new > starters so I'd rather avoid it. I've used team city but that was more > focused on dot net development > > > > > > What are people using? > > > > Kind Regards > > Sam >
Spark and continuous integration
Hi Folks This is more of a general question. What's everyone using for their CI /CD when it comes to spark We are using Pyspark but potentially looking to make to spark scala and Sbt in the future One of the suggestions was jenkins but I know the UI isn't great for new starters so I'd rather avoid it. I've used team city but that was more focused on dot net development What are people using? Kind Regards Sam
Re: How to unit test spark streaming?
Hey kant You can use holdens spark test base Have a look at some of the specs I wrote here to give you an idea https://github.com/samelamin/spark-bigquery/blob/master/src/test/scala/com/samelamin/spark/bigquery/BigQuerySchemaSpecs.scala Basically you abstract your transformations to take in a dataframe and return one, then you assert on the returned df Regards Sam On Tue, 7 Mar 2017 at 12:05, kant kodali <kanth...@gmail.com> wrote: > Hi All, > > How to unit test spark streaming or spark in general? How do I test the > results of my transformations? Also, more importantly don't we need to > spawn master and worker JVM's either in one or multiple nodes? > > Thanks! > kant >
Re: using spark to load a data warehouse in real time
Hi Adaryl Having come from a Web background myself I completely understand your confusion so let me try to clarify a few things First and foremost, Spark is a data processing engine not a general framework. In the Web applications and frameworks world you load the entities, map them to the UI and serve them up to the users then save whatever you need to back to the database via some sort of entity mapping. Whether that's an orm or a stored procedures or any other manner Spark as I mentioned is a data processing engine so there Is no concept of an orm or data mapper. You can give it the schema of what you expect the data to like like, it also works well with most of the data formats being used in the industry like CSV,JSON,AVRO and PARQUET including infering the schema from the data provided making it much easier to develop and maintain Now as to your question of loading data in real time it absolutely can be done. Traditionally data coming in arrives at a location most people call the landing. This is where the extract of the etl part begins. As Jorn mention spark streaming isn't meant to write to a database but you can write to kafka or kinesis to write to a pipeline then have another process call them and write to your end datastore. The creators of spark realised that you're use case is absolutely valid and almost everyone they talked to said that streaming on its own wasn't enough, for this very same reason the concept of structured streaming was brought in place. Se this blog post from databricks https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html You can potentially use the structured streaming APIs to continually read changes from hdfs or in your case S3 then write it out via jdbc to your end datastore I have done it before so I'll give you a few gotchas to be aware of The most important one is that your end datastore or data warehouse supports streaming inserts, some are better than others. Redshift specifically is really bad when it comes to small very frequent deltas which is what streaming at high scale is The second is that the structured streaming is still in alpha phase and the code is marked as experimental, that's not to say it will die the minute you push any load through because I found that it handled Gbs of data well. The pains I found is that the underlying goal of structured streaming was to use the underlying dataframe APIs hence unifying the batch and stream data types meaning you only need to learn one. However some methods don't yet work on the streaming dataframes such as dropDuplicates That's pretty much it. So really it comes down to you're use case, if you need the data to be reliable and never go down then implement kafka or Kinesis. If it's a proof of concept or you are trying to validate a theory use structured streaming as it's much quicker to write, weeks and months of set up vs a few hours I hope I clarified things for you Regards Sam Sent from my iPhone On Wed, 1 Mar 2017 at 07:34, Jörn Franke <jornfra...@gmail.com> wrote: I am not sure that Spark Streaming is what you want to do. It is for streaming analytics not for loading in a DWH. You need also define what realtime means and what is needed there - it will differ from client to client significantly. >From my experience, just SQL is not enough for the users in the future. Especially large data volumes require much more beyond just aggregations. These may become less useful in context of large data volumes. They have to learn new ways of dealing with the data from a business perspective by employing proper sampling of data from a large dataset, machine learning approaches etc. These are new methods which are not technically driven but business driven. I think it is wrong to assume that users learning new skills is a bad thing; it might be in the future a necessity. On 28 Feb 2017, at 23:18, Adaryl Wakefield <adaryl.wakefi...@hotmail.com> wrote: I’m actually trying to come up with a generalized use case that I can take from client to client. We have structured data coming from some application. Instead of dropping it into Hadoop and then using yet another technology to query that data, I just want to dump it into a relational MPP DW so nobody has to learn new skills or new tech just to do some analysis. Everybody and their mom can write SQL. Designing relational databases is a rare skill but not as rare as what is necessary for designing some NoSQL solutions. I’m looking for the fastest path to move a company from batch to real time analytical processing. Adaryl "Bob" Wakefield, MBA Principal Mass Street Analytics, LLC 913.938.6685 www.massstreet.net www.linkedin.com/in/bobwakefieldmba Twitter: @BobLovesData *From:* Mohammad Tariq [mailto:donta...@gmail.com <donta...@gmail.com>] *Sent:* Tuesday, February 28, 2017 12:57 PM *To:* Adaryl Wakefield <adaryl.wakefi...@hotmail.com> *Cc:* user@spark.apache.org *Sub
Re: Structured Streaming: How to handle bad input
Hi Jayesh So you have 2 problems here 1) Data was loaded in the wrong format 2) Once you handled the wrong data the spark job will continually retry the failed batch For 2 its very easy to go into the checkpoint directory and delete that offset manually and make it seem like it never happened. However for point 1 the issue is a little bit more trickier, if you receive bad data then perhaps your first point of call should be a cleaning process to ensure your data is atleast parsable, then move it to another directory which spark streaming is looking at It is unreasonable to have spark both do the streaming and handle bad data for you yet remain extremely simple and easy to use That said I personally would have a conversation with the provider of the data In this scenario I just ensure that these providers ensure the format of the data is correct, whether its CSV JSON AVRO PARQUET or whatever, I should hope whatever service/company is providing this data is providing it "correctly" to a set definition, otherwise you will have to do a pre cleaning step Perhaps someone else can suggest a better/cleaner approach Regards Sam On Thu, Feb 23, 2017 at 2:09 PM, JayeshLalwani < jayesh.lalw...@capitalone.com> wrote: > What is a good way to make a Structured Streaming application deal with bad > input? Right now, the problem is that bad input kills the Structured > Streaming application. This is highly undesirable, because a Structured > Streaming application has to be always on > > For example, here is a very simple structured streaming program > > > > > Now, I drop in a CSV file with the following data into my bucket > > > > Obviously the data is in the wrong format > > The executor and driver come crashing down > 17/02/23 08:53:40 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID > 0) > java.lang.NumberFormatException: For input string: "Iron man" > at > java.lang.NumberFormatException.forInputString( > NumberFormatException.java:65) > at java.lang.Integer.parseInt(Integer.java:580) > at java.lang.Integer.parseInt(Integer.java:615) > at scala.collection.immutable.StringLike$class.toInt( > StringLike.scala:272) > at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) > at > org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo( > CSVInferSchema.scala:250) > at > org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$ > csvParser$3.apply(CSVRelation.scala:125) > at > org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$ > csvParser$3.apply(CSVRelation.scala:94) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167) > at > org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$ > buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1. > nextIterator(FileScanRDD.scala:166) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext( > FileScanRDD.scala:102) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$ > GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator. > hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$ > anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:231) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$ > 2.apply(SparkPlan.scala:225) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$ > 1$$anonfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask. > scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$Ta
Re: quick question: best to use cluster mode or client mode for production?
I personally use spark submit as it's agnostic to which platform your spark clusters are working on e.g. Emr dataproc databricks etc On Thu, 23 Feb 2017 at 08:53, nancy henrywrote: > Hi Team, > > I have set of hc.sql("hivequery") kind of scripts which i am running right > now in spark-shell > > How should i schedule it in production > making it spark-shell -i script.scala > or keeping it in jar file through eclipse and use spark-submit deploy mode > cluster? > > which is advisable? >
Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream
just doing a bit of research, seems weve been beaten to the punch, theres already a connector you can use here <https://github.com/maropu/spark-kinesis-sql-asl/issues/4> Give it a go and feel free to give the commiter feedback or better yet send some PRs if it needs them :) On Sun, Feb 19, 2017 at 9:23 PM, Sam Elamin <hussam.ela...@gmail.com> wrote: > Hey Neil > > No worries! Happy to help you write it if you want, just link me to the > repo and we can write it together > > Would be fun! > > > Regards > Sam > On Sun, 19 Feb 2017 at 21:21, Neil Maheshwari <neil.v.maheshw...@gmail.com> > wrote: > >> Thanks for the advice Sam. I will look into implementing a structured >> streaming connector. >> >> On Feb 19, 2017, at 11:54 AM, Sam Elamin <hussam.ela...@gmail.com> wrote: >> >> HI Niel, >> >> My advice would be to write a structured streaming connector. The new >> structured streaming APIs were brought in to handle exactly the issues you >> describe >> >> See this blog >> <https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html> >> >> There isnt a structured streaming connector as of yet, but you can easily >> write one that uses the underlying batch methods to read/write to Kinesis >> >> Have a look at how I wrote my bigquery connector here >> <http://github.com/samelamin/spark-bigquery>. Plus the best thing is we >> get a new connector to a highly used datasource/sink >> >> Hope that helps >> >> Regards >> Sam >> >> On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari < >> neil.v.maheshw...@gmail.com> wrote: >> >> Thanks for your response Ayan. >> >> This could be an option. One complication I see with that approach is >> that I do not want to miss any records that are between the data we have >> batched to the data store and the checkpoint. I would still need a >> mechanism for recording the sequence number of the last time the data was >> batched, so I could start the streaming application after that sequence >> number. >> >> A similar approach could be to batch our data periodically, recording the >> last sequence number of the batch. Then, fetch data from Kinesis using the >> low level API to read data from the latest sequence number of the batched >> data up until the sequence number of the latest checkpoint from our spark >> app. I could merge batched dataset and the dataset fetched from Kinesis’s >> lower level API, and use that dataset as an RDD to prep the job. >> >> On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote: >> >> Hi >> >> AFAIK, Kinesis does not provide any mechanism other than check point to >> restart. That makes sense as it makes it so generic. >> >> Question: why cant you warm up your data from a data store? Say every 30 >> mins you run a job to aggregate your data to a data store for that hour. >> When you restart the streaming app it would read from dynamo check point, >> but it would also preps an initial rdd from data store? >> >> Best >> Ayan >> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari < >> neil.v.maheshw...@gmail.com> wrote: >> >> Hello, >> >> I am building a Spark streaming application that ingests data from an >> Amazon Kinesis stream. My application keeps track of the minimum price over >> a window for groups of similar tickets. When I deploy the application, I >> would like it to start processing at the start of the previous hours data. >> This will warm up the state of the application and allow us to deploy our >> application faster. For example, if I start the application at 3 PM, I >> would like to process the data retained by Kinesis from 2PM to 3PM, and >> then continue receiving data going forward. Spark Streaming’s Kinesis >> receiver, which relies on the Amazon Kinesis Client Library, seems to give >> me three options for choosing where to read from the stream: >> >>- read from the latest checkpointed sequence number in Dynamo >>- start from the oldest record in the stream (TRIM_HORIZON shard >>iterator type) >>- start from the most recent record in the stream (LATEST shard >>iterator type) >> >> >> Do you have any suggestions on how we could start our application at a >> specific timestamp or sequence number in the Kinesis stream? Some ideas I >> had were: >> >>- Create a KCL application that fetches the previous hour data and >>writes it to HDFS. We can create an RDD from
Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream
Hey Neil No worries! Happy to help you write it if you want, just link me to the repo and we can write it together Would be fun! Regards Sam On Sun, 19 Feb 2017 at 21:21, Neil Maheshwari <neil.v.maheshw...@gmail.com> wrote: > Thanks for the advice Sam. I will look into implementing a structured > streaming connector. > > On Feb 19, 2017, at 11:54 AM, Sam Elamin <hussam.ela...@gmail.com> wrote: > > HI Niel, > > My advice would be to write a structured streaming connector. The new > structured streaming APIs were brought in to handle exactly the issues you > describe > > See this blog > <https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html> > > There isnt a structured streaming connector as of yet, but you can easily > write one that uses the underlying batch methods to read/write to Kinesis > > Have a look at how I wrote my bigquery connector here > <http://github.com/samelamin/spark-bigquery>. Plus the best thing is we > get a new connector to a highly used datasource/sink > > Hope that helps > > Regards > Sam > > On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari < > neil.v.maheshw...@gmail.com> wrote: > > Thanks for your response Ayan. > > This could be an option. One complication I see with that approach is that > I do not want to miss any records that are between the data we have batched > to the data store and the checkpoint. I would still need a mechanism for > recording the sequence number of the last time the data was batched, so I > could start the streaming application after that sequence number. > > A similar approach could be to batch our data periodically, recording the > last sequence number of the batch. Then, fetch data from Kinesis using the > low level API to read data from the latest sequence number of the batched > data up until the sequence number of the latest checkpoint from our spark > app. I could merge batched dataset and the dataset fetched from Kinesis’s > lower level API, and use that dataset as an RDD to prep the job. > > On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote: > > Hi > > AFAIK, Kinesis does not provide any mechanism other than check point to > restart. That makes sense as it makes it so generic. > > Question: why cant you warm up your data from a data store? Say every 30 > mins you run a job to aggregate your data to a data store for that hour. > When you restart the streaming app it would read from dynamo check point, > but it would also preps an initial rdd from data store? > > Best > Ayan > On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari < > neil.v.maheshw...@gmail.com> wrote: > > Hello, > > I am building a Spark streaming application that ingests data from an > Amazon Kinesis stream. My application keeps track of the minimum price over > a window for groups of similar tickets. When I deploy the application, I > would like it to start processing at the start of the previous hours data. > This will warm up the state of the application and allow us to deploy our > application faster. For example, if I start the application at 3 PM, I > would like to process the data retained by Kinesis from 2PM to 3PM, and > then continue receiving data going forward. Spark Streaming’s Kinesis > receiver, which relies on the Amazon Kinesis Client Library, seems to give > me three options for choosing where to read from the stream: > >- read from the latest checkpointed sequence number in Dynamo >- start from the oldest record in the stream (TRIM_HORIZON shard >iterator type) >- start from the most recent record in the stream (LATEST shard >iterator type) > > > Do you have any suggestions on how we could start our application at a > specific timestamp or sequence number in the Kinesis stream? Some ideas I > had were: > >- Create a KCL application that fetches the previous hour data and >writes it to HDFS. We can create an RDD from that dataset and initialize >our Spark Streaming job with it. The spark streaming job’s Kinesis receiver >can have the same name as the initial KCL application, and use that >applications checkpoint as the starting point. We’re writing our spark jobs >in Python, so this would require launching the java MultiLang daemon, or >writing that portion of the application in Java/Scala. >- Before the Spark streaming application starts, we could fetch a >shard iterator using the AT_TIMESTAMP shard iterator type. We could record >the sequence number of the first record returned by this iterator, and >create an entry in Dynamo for our application for that sequence number. Our >Kinesis receiver would pick up from this checkpoint. It makes me a little >nervous that we would be faking Kinesis Client Library's protocol by >writing a checkpoint into Dynamo > > > Thanks in advance! > > Neil > > -- > Best Regards, > Ayan Guha > > > >
Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream
HI Niel, My advice would be to write a structured streaming connector. The new structured streaming APIs were brought in to handle exactly the issues you describe See this blog <https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html> There isnt a structured streaming connector as of yet, but you can easily write one that uses the underlying batch methods to read/write to Kinesis Have a look at how I wrote my bigquery connector here <http://github.com/samelamin/spark-bigquery>. Plus the best thing is we get a new connector to a highly used datasource/sink Hope that helps Regards Sam On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari < neil.v.maheshw...@gmail.com> wrote: > Thanks for your response Ayan. > > This could be an option. One complication I see with that approach is that > I do not want to miss any records that are between the data we have batched > to the data store and the checkpoint. I would still need a mechanism for > recording the sequence number of the last time the data was batched, so I > could start the streaming application after that sequence number. > > A similar approach could be to batch our data periodically, recording the > last sequence number of the batch. Then, fetch data from Kinesis using the > low level API to read data from the latest sequence number of the batched > data up until the sequence number of the latest checkpoint from our spark > app. I could merge batched dataset and the dataset fetched from Kinesis’s > lower level API, and use that dataset as an RDD to prep the job. > > On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote: > > Hi > > AFAIK, Kinesis does not provide any mechanism other than check point to > restart. That makes sense as it makes it so generic. > > Question: why cant you warm up your data from a data store? Say every 30 > mins you run a job to aggregate your data to a data store for that hour. > When you restart the streaming app it would read from dynamo check point, > but it would also preps an initial rdd from data store? > > Best > Ayan > On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari < > neil.v.maheshw...@gmail.com> wrote: > >> Hello, >> >> I am building a Spark streaming application that ingests data from an >> Amazon Kinesis stream. My application keeps track of the minimum price over >> a window for groups of similar tickets. When I deploy the application, I >> would like it to start processing at the start of the previous hours data. >> This will warm up the state of the application and allow us to deploy our >> application faster. For example, if I start the application at 3 PM, I >> would like to process the data retained by Kinesis from 2PM to 3PM, and >> then continue receiving data going forward. Spark Streaming’s Kinesis >> receiver, which relies on the Amazon Kinesis Client Library, seems to give >> me three options for choosing where to read from the stream: >> >>- read from the latest checkpointed sequence number in Dynamo >>- start from the oldest record in the stream (TRIM_HORIZON shard >>iterator type) >>- start from the most recent record in the stream (LATEST shard >>iterator type) >> >> >> Do you have any suggestions on how we could start our application at a >> specific timestamp or sequence number in the Kinesis stream? Some ideas I >> had were: >> >>- Create a KCL application that fetches the previous hour data and >>writes it to HDFS. We can create an RDD from that dataset and initialize >>our Spark Streaming job with it. The spark streaming job’s Kinesis >> receiver >>can have the same name as the initial KCL application, and use that >>applications checkpoint as the starting point. We’re writing our spark >> jobs >>in Python, so this would require launching the java MultiLang daemon, or >>writing that portion of the application in Java/Scala. >>- Before the Spark streaming application starts, we could fetch a >>shard iterator using the AT_TIMESTAMP shard iterator type. We could record >>the sequence number of the first record returned by this iterator, and >>create an entry in Dynamo for our application for that sequence number. >> Our >>Kinesis receiver would pick up from this checkpoint. It makes me a little >>nervous that we would be faking Kinesis Client Library's protocol by >>writing a checkpoint into Dynamo >> >> >> Thanks in advance! >> >> Neil >> > -- > Best Regards, > Ayan Guha > > >
Re: Debugging Spark application
I recommend running spark in local mode when your first debugging your code just to understand what's happening and step through it, perhaps catch a few errors when you first start off I personally use intellij because it's my preference You can follow this guide. http://www.bigendiandata.com/2016-08-26-How-to-debug-remote-spark-jobs-with-IntelliJ/ Although it's for intellij you can apply the same concepts to eclipse *I think* Regards Sam On Thu, 16 Feb 2017 at 22:00, Md. Rezaul Karim < rezaul.ka...@insight-centre.org> wrote: > Hi, > > I was looking for some URLs/documents for getting started on debugging > Spark applications. > > I prefer developing Spark applications with Scala on Eclipse and then > package the application jar before submitting. > > > > Kind regards, > Reza > > > >
Re: Enrichment with static tables
You can do a join or a union to combine all the dataframes to one fat dataframe or do a select on the columns you want to produce your transformed dataframe Not sure if I understand the question though, If the goal is just an end state transformed dataframe that can easily be done Regards Sam On Wed, Feb 15, 2017 at 6:34 PM, Gaurav Agarwal <gaurav130...@gmail.com> wrote: > Hello > > We want to enrich our spark RDD loaded with multiple Columns and multiple > Rows . This need to be enriched with 3 different tables that i loaded 3 > different spark dataframe . Can we write some logic in spark so i can > enrich my spark RDD with different stattic tables. > > Thanks > >
Re: Dealing with missing columns in SPARK SQL in JSON
ah if thats the case then you might need to define the schema before hand. Either that or if you want to infer it then ensure a jsonfile exists with the right schema so spark infers the right columns essentially making both files one dataframe if that makes sense On Tue, Feb 14, 2017 at 3:04 PM, Aseem Bansal <asmbans...@gmail.com> wrote: > Sorry if I trivialized the example. It is the same kind of file and > sometimes it could have "a", sometimes "b", sometimes both. I just don't > know. That is what I meant by missing columns. > > It would be good if I read any of the JSON and if I do spark sql and it > gave me > > for json1.json > > a | b > 1 | null > > for json2.json > > a | b > null | 2 > > > On Tue, Feb 14, 2017 at 8:13 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > >> I may be missing something super obvious here but can't you combine them >> into a single dataframe. Left join perhaps? >> >> Try writing it in sql " select a from json1 and b from josn2"then run >> explain to give you a hint to how to do it in code >> >> Regards >> Sam >> On Tue, 14 Feb 2017 at 14:30, Aseem Bansal <asmbans...@gmail.com> wrote: >> >>> Say I have two files containing single rows >>> >>> json1.json >>> >>> {"a": 1} >>> >>> json2.json >>> >>> {"b": 2} >>> >>> I read in this json file using spark's API into a dataframe one at a >>> time. So I have >>> >>> Dataset json1DF >>> and >>> Dataset json2DF >>> >>> If I run "select a, b from __THIS__" in a SQLTransformer then I will get >>> an exception as for json1DF does not have "b" and json2DF does not have "a" >>> >>> How could I handle this situation with missing columns in JSON? >>> >> >
Re: Dealing with missing columns in SPARK SQL in JSON
I may be missing something super obvious here but can't you combine them into a single dataframe. Left join perhaps? Try writing it in sql " select a from json1 and b from josn2"then run explain to give you a hint to how to do it in code Regards Sam On Tue, 14 Feb 2017 at 14:30, Aseem Bansal <asmbans...@gmail.com> wrote: > Say I have two files containing single rows > > json1.json > > {"a": 1} > > json2.json > > {"b": 2} > > I read in this json file using spark's API into a dataframe one at a time. > So I have > > Dataset json1DF > and > Dataset json2DF > > If I run "select a, b from __THIS__" in a SQLTransformer then I will get > an exception as for json1DF does not have "b" and json2DF does not have "a" > > How could I handle this situation with missing columns in JSON? >
Re: how to fix the order of data
Its because you are just printing on the rdd You can sort the df like below input.toDF().sort().collect() or if you do not want to convert to a dataframe you can use the sort by *sortByKey*([*ascending*], [*numTasks*]) Regards Sam On Tue, Feb 14, 2017 at 11:41 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote: >HI all, > the belowing is my test code. I found the output of val > input is different. how do i fix the order please? > > scala> val input = sc.parallelize( Array(1,2,3)) > input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at > parallelize at :24 > > scala> input.foreach(print) > 132 > scala> input.foreach(print) > 213 > scala> input.foreach(print) > 312
Re: Etl with spark
Yup I ended up doing just that thank you both On Sun, 12 Feb 2017 at 18:33, Miguel Morales <therevolti...@gmail.com> wrote: > You can parallelize the collection of s3 keys and then pass that to your > map function so that files are read in parallel. > > Sent from my iPhone > > On Feb 12, 2017, at 9:41 AM, Sam Elamin <hussam.ela...@gmail.com> wrote: > > thanks Ayan but i was hoping to remove the dependency on a file and just > use in memory list or dictionary > > So from the reading I've done today it seems.the concept of a bespoke > async method doesn't really apply in spsrk since the cluster deals with > distributing the work load > > > Am I mistaken? > > Regards > Sam > On Sun, 12 Feb 2017 at 12:13, ayan guha <guha.a...@gmail.com> wrote: > > You can store the list of keys (I believe you use them in source file > path, right?) in a file, one key per line. Then you can read the file using > sc.textFile (So you will get a RDD of file paths) and then apply your > function as a map. > > r = sc.textFile(list_file).map(your_function) > > HTH > > On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > Hey folks > > Really simple question here. I currently have an etl pipeline that reads > from s3 and saves the data to an endstore > > > I have to read from a list of keys in s3 but I am doing a raw extract then > saving. Only some of the extracts have a simple transformation but overall > the code looks the same > > > I abstracted away this logic into a method that takes in an s3 path does > the common transformations and saves to source > > > But the job takes about 10 mins or so because I'm iteratively going down a > list of keys > > Is it possible to asynchronously do this? > > FYI I'm using spark.read.json to read from s3 because it infers my schema > > Regards > Sam > > > > > -- > Best Regards, > Ayan Guha > >
Re: Etl with spark
thanks Ayan but i was hoping to remove the dependency on a file and just use in memory list or dictionary So from the reading I've done today it seems.the concept of a bespoke async method doesn't really apply in spsrk since the cluster deals with distributing the work load Am I mistaken? Regards Sam On Sun, 12 Feb 2017 at 12:13, ayan guha <guha.a...@gmail.com> wrote: You can store the list of keys (I believe you use them in source file path, right?) in a file, one key per line. Then you can read the file using sc.textFile (So you will get a RDD of file paths) and then apply your function as a map. r = sc.textFile(list_file).map(your_function) HTH On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin <hussam.ela...@gmail.com> wrote: Hey folks Really simple question here. I currently have an etl pipeline that reads from s3 and saves the data to an endstore I have to read from a list of keys in s3 but I am doing a raw extract then saving. Only some of the extracts have a simple transformation but overall the code looks the same I abstracted away this logic into a method that takes in an s3 path does the common transformations and saves to source But the job takes about 10 mins or so because I'm iteratively going down a list of keys Is it possible to asynchronously do this? FYI I'm using spark.read.json to read from s3 because it infers my schema Regards Sam -- Best Regards, Ayan Guha
Etl with spark
Hey folks Really simple question here. I currently have an etl pipeline that reads from s3 and saves the data to an endstore I have to read from a list of keys in s3 but I am doing a raw extract then saving. Only some of the extracts have a simple transformation but overall the code looks the same I abstracted away this logic into a method that takes in an s3 path does the common transformations and saves to source But the job takes about 10 mins or so because I'm iteratively going down a list of keys Is it possible to asynchronously do this? FYI I'm using spark.read.json to read from s3 because it infers my schema Regards Sam
Re: [Structured Streaming] Using File Sink to store to hive table.
Here's a link to the thread http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-Dropping-Duplicates-td20884.html On Sat, 11 Feb 2017 at 08:47, Sam Elamin <hussam.ela...@gmail.com> wrote: > Hey Egor > > > You can use for each writer or you can write a custom sink. I personally > went with a custom sink since I get a dataframe per batch > > > https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala > > You can have a look at how I implemented something similar to file sink > that in the event if a failure skips batches already written > > > Also have a look at Micheals reply to me a few days ago on exactly the > same topic. The email subject was called structured streaming. Dropping > duplicates > > > Regards > > Sam > > On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski <ja...@japila.pl> wrote: > > "Something like that" I've never tried it out myself so I'm only > guessing having a brief look at the API. > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov <pahomov.e...@gmail.com> > wrote: > > Jacek, so I create cache in ForeachWriter, in all "process()" I write to > it > > and on close I flush? Something like that? > > > > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski <ja...@japila.pl>: > >> > >> Hi, > >> > >> Yes, that's ForeachWriter. > >> > >> Yes, it works with element by element. You're looking for mapPartition > >> and ForeachWriter has partitionId that you could use to implement a > >> similar thing. > >> > >> Pozdrawiam, > >> Jacek Laskowski > >> > >> https://medium.com/@jaceklaskowski/ > >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark > >> Follow me at https://twitter.com/jaceklaskowski > >> > >> > >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov <pahomov.e...@gmail.com> > >> wrote: > >> > Jacek, you mean > >> > > >> > > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter > >> > ? I do not understand how to use it, since it passes every value > >> > separately, > >> > not every partition. And addding to table value by value would not > work > >> > > >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski <ja...@japila.pl>: > >> >> > >> >> Hi, > >> >> > >> >> Have you considered foreach sink? > >> >> > >> >> Jacek > >> >> > >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" <pahomov.e...@gmail.com> > wrote: > >> >>> > >> >>> Hi, I'm thinking of using Structured Streaming instead of old > >> >>> streaming, > >> >>> but I need to be able to save results to Hive table. Documentation > for > >> >>> file > >> >>> sink > >> >>> > >> >>> says( > http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks > ): > >> >>> "Supports writes to partitioned tables. ". But being able to write > to > >> >>> partitioned directories is not enough to write to the table: someone > >> >>> needs > >> >>> to write to Hive metastore. How can I use Structured Streaming and > >> >>> write to > >> >>> Hive table? > >> >>> > >> >>> -- > >> >>> Sincerely yours > >> >>> Egor Pakhomov > >> > > >> > > >> > > >> > > >> > -- > >> > Sincerely yours > >> > Egor Pakhomov > > > > > > > > > > -- > > Sincerely yours > > Egor Pakhomov > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: [Structured Streaming] Using File Sink to store to hive table.
Hey Egor You can use for each writer or you can write a custom sink. I personally went with a custom sink since I get a dataframe per batch https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala You can have a look at how I implemented something similar to file sink that in the event if a failure skips batches already written Also have a look at Micheals reply to me a few days ago on exactly the same topic. The email subject was called structured streaming. Dropping duplicates Regards Sam On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski <ja...@japila.pl> wrote: "Something like that" I've never tried it out myself so I'm only guessing having a brief look at the API. Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov <pahomov.e...@gmail.com> wrote: > Jacek, so I create cache in ForeachWriter, in all "process()" I write to it > and on close I flush? Something like that? > > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski <ja...@japila.pl>: >> >> Hi, >> >> Yes, that's ForeachWriter. >> >> Yes, it works with element by element. You're looking for mapPartition >> and ForeachWriter has partitionId that you could use to implement a >> similar thing. >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov <pahomov.e...@gmail.com> >> wrote: >> > Jacek, you mean >> > >> > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter >> > ? I do not understand how to use it, since it passes every value >> > separately, >> > not every partition. And addding to table value by value would not work >> > >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski <ja...@japila.pl>: >> >> >> >> Hi, >> >> >> >> Have you considered foreach sink? >> >> >> >> Jacek >> >> >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" <pahomov.e...@gmail.com> wrote: >> >>> >> >>> Hi, I'm thinking of using Structured Streaming instead of old >> >>> streaming, >> >>> but I need to be able to save results to Hive table. Documentation for >> >>> file >> >>> sink >> >>> >> >>> says( http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks ): >> >>> "Supports writes to partitioned tables. ". But being able to write to >> >>> partitioned directories is not enough to write to the table: someone >> >>> needs >> >>> to write to Hive metastore. How can I use Structured Streaming and >> >>> write to >> >>> Hive table? >> >>> >> >>> -- >> >>> Sincerely yours >> >>> Egor Pakhomov >> > >> > >> > >> > >> > -- >> > Sincerely yours >> > Egor Pakhomov > > > > > -- > Sincerely yours > Egor Pakhomov - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Structured Streaming. S3 To Google BigQuery
Hi All Thank you all for the amazing support! I have written a BigQuery connector for structured streaming that you can find here <https://github.com/samelamin/spark-bigquery> I just tweeted <https://twitter.com/samelamin/status/829477884024782852> about it and would really appreciated it if you retweeted when you get a chance The more people know about it and use it the more feedback I can get to make the connector better! Ofcourse PRs and feedback are always welcome :) Thanks again! Regards Sam
Re: specifing schema on dataframe
Ah ok Thanks for clearing it up Ayan! i will give that a go Thank you all for your help, this mailing list is awesome! On Mon, Feb 6, 2017 at 9:07 AM, ayan guha <guha.a...@gmail.com> wrote: > If I am not missing anything here, "So I know which columns are numeric > and which arent because I have a StructType and all the internal > StructFields will tell me which ones have a DataType which is numeric and > which arent" will lead to getting to a list of fields which should be > numeric. > > Essentially, You will create a list of numeric fields from your > "should-be" struct type. Then you will load your raw data using built-in > json reader. At this point, your data have a wrong schema. Now, you will > need to correct it. How? You will loop over the list of numeric fields (or, > you can do it directly on the struct type), and try to match the type. If > you find a mismatch, you'd add a withColumn clause to cast to the correct > data type (from your "should-be" struct). > > HTH? > > Best > Ayan > > On Mon, Feb 6, 2017 at 8:00 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > >> Yup sorry I should have explained myself better >> >> So I know which columns are numeric and which arent because I have a >> StructType and all the internal StructFields will tell me which ones have a >> DataType which is numeric and which arent >> >> So assuming I have a json string which has double quotes on numbers when >> it shouldnt, and I have the correct schema in a struct type >> >> >> how can I iterate over them to programatically create the new dataframe >> in the correct format >> >> do i iterate over the columns in the StructType? or iterate over the >> columns in the dataframe and try to match them with the StructType? >> >> I hope I cleared things up, What I wouldnt do for a drawing board right >> now! >> >> >> On Mon, Feb 6, 2017 at 8:56 AM, ayan guha <guha.a...@gmail.com> wrote: >> >>> UmmI think the premise is you need to "know" beforehand which >>> columns are numeric.Unless you know it, how would you apply the schema? >>> >>> On Mon, Feb 6, 2017 at 7:54 PM, Sam Elamin <hussam.ela...@gmail.com> >>> wrote: >>> >>>> Thanks ayan but I meant how to derive the list automatically >>>> >>>> In your example you are specifying the numeric columns and I would like >>>> it to be applied to any schema if that makes sense >>>> On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote: >>>> >>>>> SImple (pyspark) example: >>>>> >>>>> >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json") >>>>> >>> df.printSchema() >>>>> root >>>>> |-- customerid: string (nullable = true) >>>>> |-- foo: string (nullable = true) >>>>> >>>>> >>> numeric_field_list = ['customerid'] >>>>> >>>>> >>> for k in numeric_field_list: >>>>> ... df = df.withColumn(k,df[k].cast("long")) >>>>> ... >>>>> >>> df.printSchema() >>>>> root >>>>> |-- customerid: long (nullable = true) >>>>> |-- foo: string (nullable = true) >>>>> >>>>> >>>>> On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com> >>>>> wrote: >>>>> >>>>> Ok thanks Micheal! >>>>> >>>>> >>>>> Can I get an idea on where to start? Assuming I have the end schema >>>>> and the current dataframe... >>>>> How can I loop through it and create a new dataframe using the >>>>> WithColumn? >>>>> >>>>> >>>>> Am I iterating through the dataframe or the schema? >>>>> >>>>> I'm assuming it's easier to iterate through the columns in the old df. >>>>> For each column cast it correctly and generate a new df? >>>>> >>>>> >>>>> Would you recommend that? >>>>> >>>>> Regards >>>>> Sam >>>>> On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com> >>>>> wrote: >>>>> >>>>> If you already have the expected schema, and you know that all numbers >>>>> will always be formatted as strings in the input JSON, you could probably
Re: specifing schema on dataframe
Yup sorry I should have explained myself better So I know which columns are numeric and which arent because I have a StructType and all the internal StructFields will tell me which ones have a DataType which is numeric and which arent So assuming I have a json string which has double quotes on numbers when it shouldnt, and I have the correct schema in a struct type how can I iterate over them to programatically create the new dataframe in the correct format do i iterate over the columns in the StructType? or iterate over the columns in the dataframe and try to match them with the StructType? I hope I cleared things up, What I wouldnt do for a drawing board right now! On Mon, Feb 6, 2017 at 8:56 AM, ayan guha <guha.a...@gmail.com> wrote: > UmmI think the premise is you need to "know" beforehand which columns > are numeric.Unless you know it, how would you apply the schema? > > On Mon, Feb 6, 2017 at 7:54 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > >> Thanks ayan but I meant how to derive the list automatically >> >> In your example you are specifying the numeric columns and I would like >> it to be applied to any schema if that makes sense >> On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote: >> >>> SImple (pyspark) example: >>> >>> >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json") >>> >>> df.printSchema() >>> root >>> |-- customerid: string (nullable = true) >>> |-- foo: string (nullable = true) >>> >>> >>> numeric_field_list = ['customerid'] >>> >>> >>> for k in numeric_field_list: >>> ... df = df.withColumn(k,df[k].cast("long")) >>> ... >>> >>> df.printSchema() >>> root >>> |-- customerid: long (nullable = true) >>> |-- foo: string (nullable = true) >>> >>> >>> On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com> >>> wrote: >>> >>> Ok thanks Micheal! >>> >>> >>> Can I get an idea on where to start? Assuming I have the end schema and >>> the current dataframe... >>> How can I loop through it and create a new dataframe using the >>> WithColumn? >>> >>> >>> Am I iterating through the dataframe or the schema? >>> >>> I'm assuming it's easier to iterate through the columns in the old df. >>> For each column cast it correctly and generate a new df? >>> >>> >>> Would you recommend that? >>> >>> Regards >>> Sam >>> On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com> >>> wrote: >>> >>> If you already have the expected schema, and you know that all numbers >>> will always be formatted as strings in the input JSON, you could probably >>> derive this list automatically. >>> >>> Wouldn't it be simpler to just regex replace the numbers to remove the >>> quotes? >>> >>> >>> I think this is likely to be a slower and less robust solution. You >>> would have to make sure that you got all the corner cases right (i.e. >>> escaping and what not). >>> >>> On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com> >>> wrote: >>> >>> I see so for the connector I need to pass in an array/list of numerical >>> columns? >>> >>> Wouldnt it be simpler to just regex replace the numbers to remove the >>> quotes? >>> >>> >>> Regards >>> Sam >>> >>> On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust < >>> mich...@databricks.com> wrote: >>> >>> Specifying the schema when parsing JSON will only let you pick between >>> similar datatypes (i.e should this be a short, long float, double etc). It >>> will not let you perform conversions like string <-> number. This has to >>> be done with explicit casts after the data has been loaded. >>> >>> I think you can make a solution that uses select or withColumn generic. >>> Just load the dataframe with a "parse schema" that treats numbers as >>> strings. Then construct a list of columns that should be numbers and apply >>> the necessary conversions. >>> >>> import org.apache.spark.sql.functions.col >>> var df = spark.read.schema(parseSchema).json("...") >>> numericColumns.foreach { columnName => >>> df = df.withColumn(
Re: specifing schema on dataframe
Thanks ayan but I meant how to derive the list automatically In your example you are specifying the numeric columns and I would like it to be applied to any schema if that makes sense On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote: > SImple (pyspark) example: > > >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json") > >>> df.printSchema() > root > |-- customerid: string (nullable = true) > |-- foo: string (nullable = true) > > >>> numeric_field_list = ['customerid'] > > >>> for k in numeric_field_list: > ... df = df.withColumn(k,df[k].cast("long")) > ... > >>> df.printSchema() > root > |-- customerid: long (nullable = true) > |-- foo: string (nullable = true) > > > On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > Ok thanks Micheal! > > > Can I get an idea on where to start? Assuming I have the end schema and > the current dataframe... > How can I loop through it and create a new dataframe using the WithColumn? > > > Am I iterating through the dataframe or the schema? > > I'm assuming it's easier to iterate through the columns in the old df. For > each column cast it correctly and generate a new df? > > > Would you recommend that? > > Regards > Sam > On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com> > wrote: > > If you already have the expected schema, and you know that all numbers > will always be formatted as strings in the input JSON, you could probably > derive this list automatically. > > Wouldn't it be simpler to just regex replace the numbers to remove the > quotes? > > > I think this is likely to be a slower and less robust solution. You would > have to make sure that you got all the corner cases right (i.e. escaping > and what not). > > On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > I see so for the connector I need to pass in an array/list of numerical > columns? > > Wouldnt it be simpler to just regex replace the numbers to remove the > quotes? > > > Regards > Sam > > On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com> > wrote: > > Specifying the schema when parsing JSON will only let you pick between > similar datatypes (i.e should this be a short, long float, double etc). It > will not let you perform conversions like string <-> number. This has to > be done with explicit casts after the data has been loaded. > > I think you can make a solution that uses select or withColumn generic. > Just load the dataframe with a "parse schema" that treats numbers as > strings. Then construct a list of columns that should be numbers and apply > the necessary conversions. > > import org.apache.spark.sql.functions.col > var df = spark.read.schema(parseSchema).json("...") > numericColumns.foreach { columnName => > df = df.withColumn(columnName, col(columnName).cast("long")) > } > > > > On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > Thanks Micheal > > I've been spending the past few days researching this > > The problem is the generated json has double quotes on fields that are > numbers because the producing datastore doesn't want to lose precision > > I can change the data type true but that would be on specific to a job > rather than a generic streaming job. I'm writing a structured streaming > connector and I have the schema the generated dataframe should match. > > Unfortunately using withColumn won't help me here since the solution needs > to be generic > > To summarise assume I have the following json > > [{ > "customerid": "535137", > "foo": "bar" > }] > > > and I know the schema should be: > > StructType(Array(StructField("customerid",LongType,true),StructField("foo",StringType,true))) > > Whats the best way of solving this? > > My current approach is to iterate over the JSON and identify which fields > are numbers and which arent then recreate the json > > But to be honest that doesnt seem like the cleanest approach, so happy for > advice on this > > Regards > Sam > > On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com> > wrote: > > -dev > > You can use withColumn to change the type after the data has been loaded > <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html> > . >
Re: specifing schema on dataframe
Ok thanks Micheal! Can I get an idea on where to start? Assuming I have the end schema and the current dataframe... How can I loop through it and create a new dataframe using the WithColumn? Am I iterating through the dataframe or the schema? I'm assuming it's easier to iterate through the columns in the old df. For each column cast it correctly and generate a new df? Would you recommend that? Regards Sam On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com> wrote: > If you already have the expected schema, and you know that all numbers > will always be formatted as strings in the input JSON, you could probably > derive this list automatically. > > Wouldn't it be simpler to just regex replace the numbers to remove the > quotes? > > > I think this is likely to be a slower and less robust solution. You would > have to make sure that you got all the corner cases right (i.e. escaping > and what not). > > On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > I see so for the connector I need to pass in an array/list of numerical > columns? > > Wouldnt it be simpler to just regex replace the numbers to remove the > quotes? > > > Regards > Sam > > On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com> > wrote: > > Specifying the schema when parsing JSON will only let you pick between > similar datatypes (i.e should this be a short, long float, double etc). It > will not let you perform conversions like string <-> number. This has to > be done with explicit casts after the data has been loaded. > > I think you can make a solution that uses select or withColumn generic. > Just load the dataframe with a "parse schema" that treats numbers as > strings. Then construct a list of columns that should be numbers and apply > the necessary conversions. > > import org.apache.spark.sql.functions.col > var df = spark.read.schema(parseSchema).json("...") > numericColumns.foreach { columnName => > df = df.withColumn(columnName, col(columnName).cast("long")) > } > > > > On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > Thanks Micheal > > I've been spending the past few days researching this > > The problem is the generated json has double quotes on fields that are > numbers because the producing datastore doesn't want to lose precision > > I can change the data type true but that would be on specific to a job > rather than a generic streaming job. I'm writing a structured streaming > connector and I have the schema the generated dataframe should match. > > Unfortunately using withColumn won't help me here since the solution needs > to be generic > > To summarise assume I have the following json > > [{ > "customerid": "535137", > "foo": "bar" > }] > > > and I know the schema should be: > > StructType(Array(StructField("customerid",LongType,true),StructField("foo",StringType,true))) > > Whats the best way of solving this? > > My current approach is to iterate over the JSON and identify which fields > are numbers and which arent then recreate the json > > But to be honest that doesnt seem like the cleanest approach, so happy for > advice on this > > Regards > Sam > > On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com> > wrote: > > -dev > > You can use withColumn to change the type after the data has been loaded > <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html> > . > > On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > Hi Direceu > > Thanks your right! that did work > > > But now im facing an even bigger problem since i dont have access to > change the underlying data, I just want to apply a schema over something > that was written via the sparkContext.newAPIHadoopRDD > > Basically I am reading in a RDD[JsonObject] and would like to convert it > into a dataframe which I pass the schema into > > Whats the best way to do this? > > I doubt removing all the quotes in the JSON is the best solution is it? > > Regards > Sam > > On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho < > dirceu.semigh...@gmail.com> wrote: > > Hi Sam > Remove the " from the number that it will work > > Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com> > escreveu: > > Hi All > > I would like to specify a schema when reading fro
Re: specifing schema on dataframe
I see so for the connector I need to pass in an array/list of numerical columns? Wouldnt it be simpler to just regex replace the numbers to remove the quotes? Regards Sam On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com> wrote: > Specifying the schema when parsing JSON will only let you pick between > similar datatypes (i.e should this be a short, long float, double etc). It > will not let you perform conversions like string <-> number. This has to > be done with explicit casts after the data has been loaded. > > I think you can make a solution that uses select or withColumn generic. > Just load the dataframe with a "parse schema" that treats numbers as > strings. Then construct a list of columns that should be numbers and apply > the necessary conversions. > > import org.apache.spark.sql.functions.col > var df = spark.read.schema(parseSchema).json("...") > numericColumns.foreach { columnName => > df = df.withColumn(columnName, col(columnName).cast("long")) > } > > > > On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > >> Thanks Micheal >> >> I've been spending the past few days researching this >> >> The problem is the generated json has double quotes on fields that are >> numbers because the producing datastore doesn't want to lose precision >> >> I can change the data type true but that would be on specific to a job >> rather than a generic streaming job. I'm writing a structured streaming >> connector and I have the schema the generated dataframe should match. >> >> Unfortunately using withColumn won't help me here since the solution >> needs to be generic >> >> To summarise assume I have the following json >> >> [{ >> "customerid": "535137", >> "foo": "bar" >> }] >> >> >> and I know the schema should be: >> StructType(Array(StructField("customerid",LongType,true),Str >> uctField("foo",StringType,true))) >> >> Whats the best way of solving this? >> >> My current approach is to iterate over the JSON and identify which fields >> are numbers and which arent then recreate the json >> >> But to be honest that doesnt seem like the cleanest approach, so happy >> for advice on this >> >> Regards >> Sam >> >> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com> >> wrote: >> >>> -dev >>> >>> You can use withColumn to change the type after the data has been loaded >>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html> >>> . >>> >>> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com> >>> wrote: >>> >>> Hi Direceu >>> >>> Thanks your right! that did work >>> >>> >>> But now im facing an even bigger problem since i dont have access to >>> change the underlying data, I just want to apply a schema over something >>> that was written via the sparkContext.newAPIHadoopRDD >>> >>> Basically I am reading in a RDD[JsonObject] and would like to convert it >>> into a dataframe which I pass the schema into >>> >>> Whats the best way to do this? >>> >>> I doubt removing all the quotes in the JSON is the best solution is it? >>> >>> Regards >>> Sam >>> >>> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho < >>> dirceu.semigh...@gmail.com> wrote: >>> >>> Hi Sam >>> Remove the " from the number that it will work >>> >>> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com> >>> escreveu: >>> >>> Hi All >>> >>> I would like to specify a schema when reading from a json but when >>> trying to map a number to a Double it fails, I tried FloatType and IntType >>> with no joy! >>> >>> >>> When inferring the schema customer id is set to String, and I would like >>> to cast it as Double >>> >>> so df1 is corrupted while df2 shows >>> >>> >>> Also FYI I need this to be generic as I would like to apply it to any >>> json, I specified the below schema as an example of the issue I am facing >>> >>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, >>> DoubleType,FloatType, StructType, LongType,DecimalType} >>> val testSchema = StructType(Array(StructField("customerid",DoubleType))) >>> val df1 = >>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}"""))) >>> val df2 = >>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}"""))) >>> df1.show(1) >>> df2.show(1) >>> >>> >>> Any help would be appreciated, I am sure I am missing something obvious >>> but for the life of me I cant tell what it is! >>> >>> >>> Kind Regards >>> Sam >>> >>> >>> >>> >
Re: specifing schema on dataframe
Thanks Micheal I've been spending the past few days researching this The problem is the generated json has double quotes on fields that are numbers because the producing datastore doesn't want to lose precision I can change the data type true but that would be on specific to a job rather than a generic streaming job. I'm writing a structured streaming connector and I have the schema the generated dataframe should match. Unfortunately using withColumn won't help me here since the solution needs to be generic To summarise assume I have the following json [{ "customerid": "535137", "foo": "bar" }] and I know the schema should be: StructType(Array(StructField("customerid",LongType,true),StructField("foo",StringType,true))) Whats the best way of solving this? My current approach is to iterate over the JSON and identify which fields are numbers and which arent then recreate the json But to be honest that doesnt seem like the cleanest approach, so happy for advice on this Regards Sam On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com> wrote: > -dev > > You can use withColumn to change the type after the data has been loaded > <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html> > . > > On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com> > wrote: > > Hi Direceu > > Thanks your right! that did work > > > But now im facing an even bigger problem since i dont have access to > change the underlying data, I just want to apply a schema over something > that was written via the sparkContext.newAPIHadoopRDD > > Basically I am reading in a RDD[JsonObject] and would like to convert it > into a dataframe which I pass the schema into > > Whats the best way to do this? > > I doubt removing all the quotes in the JSON is the best solution is it? > > Regards > Sam > > On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho < > dirceu.semigh...@gmail.com> wrote: > > Hi Sam > Remove the " from the number that it will work > > Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com> > escreveu: > > Hi All > > I would like to specify a schema when reading from a json but when trying > to map a number to a Double it fails, I tried FloatType and IntType with no > joy! > > > When inferring the schema customer id is set to String, and I would like > to cast it as Double > > so df1 is corrupted while df2 shows > > > Also FYI I need this to be generic as I would like to apply it to any > json, I specified the below schema as an example of the issue I am facing > > import org.apache.spark.sql.types.{BinaryType, StringType, StructField, > DoubleType,FloatType, StructType, LongType,DecimalType} > val testSchema = StructType(Array(StructField("customerid",DoubleType))) > val df1 = > spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}"""))) > val df2 = > spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}"""))) > df1.show(1) > df2.show(1) > > > Any help would be appreciated, I am sure I am missing something obvious > but for the life of me I cant tell what it is! > > > Kind Regards > Sam > > > >
Re: specifing schema on dataframe
Hi Direceu Thanks your right! that did work But now im facing an even bigger problem since i dont have access to change the underlying data, I just want to apply a schema over something that was written via the sparkContext.newAPIHadoopRDD Basically I am reading in a RDD[JsonObject] and would like to convert it into a dataframe which I pass the schema into Whats the best way to do this? I doubt removing all the quotes in the JSON is the best solution is it? Regards Sam On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho < dirceu.semigh...@gmail.com> wrote: > Hi Sam > Remove the " from the number that it will work > > Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com> > escreveu: > >> Hi All >> >> I would like to specify a schema when reading from a json but when trying >> to map a number to a Double it fails, I tried FloatType and IntType with no >> joy! >> >> >> When inferring the schema customer id is set to String, and I would like >> to cast it as Double >> >> so df1 is corrupted while df2 shows >> >> >> Also FYI I need this to be generic as I would like to apply it to any >> json, I specified the below schema as an example of the issue I am facing >> >> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, >> DoubleType,FloatType, StructType, LongType,DecimalType} >> val testSchema = StructType(Array(StructField("customerid",DoubleType))) >> val df1 = >> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}"""))) >> val df2 = >> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}"""))) >> df1.show(1) >> df2.show(1) >> >> >> Any help would be appreciated, I am sure I am missing something obvious >> but for the life of me I cant tell what it is! >> >> >> Kind Regards >> Sam >> >
specifing schema on dataframe
Hi All I would like to specify a schema when reading from a json but when trying to map a number to a Double it fails, I tried FloatType and IntType with no joy! When inferring the schema customer id is set to String, and I would like to cast it as Double so df1 is corrupted while df2 shows Also FYI I need this to be generic as I would like to apply it to any json, I specified the below schema as an example of the issue I am facing import org.apache.spark.sql.types.{BinaryType, StringType, StructField, DoubleType,FloatType, StructType, LongType,DecimalType} val testSchema = StructType(Array(StructField("customerid",DoubleType))) val df1 = spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}"""))) val df2 = spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}"""))) df1.show(1) df2.show(1) Any help would be appreciated, I am sure I am missing something obvious but for the life of me I cant tell what it is! Kind Regards Sam
Re: java.lang.NoSuchMethodError: scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef
Hi sathyanarayanan zero() on scala.runtime.VolatileObjectRef has been introduced in Scala 2.11 You probably have a library compiled against Scala 2.11 and running on a Scala 2.10 runtime. See v2.10: https://github.com/scala/scala/blob/2.10.x/src/library/scala/runtime/VolatileObjectRef.java v2.11: https://github.com/scala/scala/blob/2.11.x/src/library/scala/runtime/VolatileObjectRef.java Regards Sam On Sat, 4 Feb 2017 at 09:24, sathyanarayanan mudhaliyar < sathyanarayananmudhali...@gmail.com> wrote: > Hi , > I got the error below when executed > > Exception in thread "main" java.lang.NoSuchMethodError: > scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef; > > error in detail: > > Exception in thread "main" java.lang.NoSuchMethodError: > scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef; > at > com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala) > at > com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149) > at > com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149) > at > com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) > at > com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) > at > com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:82) > at com.nwf.Consumer.main(Consumer.java:63) > > code : > > Consumer consumer = new Consumer(); > SparkConf conf = new > SparkConf().setAppName("kafka-sandbox").setMaster("local[2]"); > conf.set("spark.cassandra.connection.host", "localhost"); //connection > for cassandra database > JavaSparkContext sc = new JavaSparkContext(conf); > CassandraConnector connector = CassandraConnector.apply(sc.getConf()); > final Session session = connector.openSession(); > final PreparedStatement prepared = session.prepare("INSERT INTO > spark_test5.messages JSON?"); > > > The error is in the line which is in green color. > Thank you guys. > >
Upgrading to Spark 2.0.1 broke array in parquet DataFrame
I have a table with a few columns, some of which are arrays. Since upgrading from Spark 1.6 to Spark 2.0.1, the array fields are always null when reading in a DataFrame. When writing the Parquet files, the schema of the column is specified as StructField("packageIds",ArrayType(StringType)) The schema of the column in the Hive Metastore is packageIds array The schema used in the writer exactly matches the schema in the Metastore in all ways (order, casing, types etc) The query is a simple "select *" spark.sql("select * from tablename limit 1").collect() // null columns in Row How can I begin debugging this issue? Notable things I've already investigated: - Files were written using Spark 1.6 - DataFrame works in spark 1.5 and 1.6 - I've inspected the parquet files using parquet-tools and can see the data. - I also have another table written in exactly the same way and it doesn't have the issue.
Re: Spark join and large temp files
Have you tried to broadcast your small table table in order to perform your join ? joined = bigDF.join(broadcast(smallDF, ) On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtabwrote: > Hi Deepak, > No...not really. Upping the disk size is a solution, but more expensive as > you can't attach EBS volumes to EMR clusters configured with data pipelines > easily (which is what we're doing). I've tried collecting the 1.5G dataset > in a hashmap, and broadcasting. Timeouts seems to prevent that (even after > upping the max driver result size). Increasing partition counts didn't help > (the shuffle used up the temp space). I'm now looking at some form of > clever broadcasting, or maybe falling back to chunking up the input, > producing interim output, and unioning them for the final output. Might > even try using Spark Streaming pointing to the parquet and seeing if that > helps. > > -Ashic. > > -- > From: deepakmc...@gmail.com > Date: Tue, 9 Aug 2016 17:31:19 +0530 > Subject: Re: Spark join and large temp files > To: as...@live.com > > Hi Ashic > Did you find the resolution to this issue? > Just curious to know like what helped in this scenario. > > Thanks > Deepak > > > On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab wrote: > > Hi Deepak, > Thanks for the response. > > Registering the temp tables didn't help. Here's what I have: > > val a = sqlContext..read.parquet(...).select("eid.id", > "name").withColumnRenamed("eid.id", "id") > val b = sqlContext.read.parquet(...).select("id", "number") > > a.registerTempTable("a") > b.registerTempTable("b") > > val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join > b y on x.id=y.id) > > results.write.parquet(...) > > Is there something I'm missing? > > Cheers, > Ashic. > > -- > From: deepakmc...@gmail.com > Date: Tue, 9 Aug 2016 00:01:32 +0530 > Subject: Re: Spark join and large temp files > To: as...@live.com > CC: user@spark.apache.org > > > Register you dataframes as temp tables and then try the join on the temp > table. > This should resolve your issue. > > Thanks > Deepak > > On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab wrote: > > Hello, > We have two parquet inputs of the following form: > > a: id:String, Name:String (1.5TB) > b: id:String, Number:Int (1.3GB) > > We need to join these two to get (id, Number, Name). We've tried two > approaches: > > a.join(b, Seq("id"), "right_outer") > > where a and b are dataframes. We also tried taking the rdds, mapping them > to pair rdds with id as the key, and then joining. What we're seeing is > that temp file usage is increasing on the join stage, and filling up our > disks, causing the job to crash. Is there a way to join these two data sets > without well...crashing? > > Note, the ids are unique, and there's a one to one mapping between the two > datasets. > > Any help would be appreciated. > > -Ashic. > > > > > > > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net > > > > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net >
Re: hdfs-ha on mesos - odd bug
I don't know about the broken url. But are you running HDFS as a mesos framework? If so is it using mesos-dns? Then you should resolve the namenode via hdfs:/// On Mon, Sep 14, 2015 at 3:55 PM, Adrian Bridgettwrote: > I'm hitting an odd issue with running spark on mesos together with > HA-HDFS, with an even odder workaround. > > In particular I get an error that it can't find the HDFS nameservice > unless I put in a _broken_ url (discovered that workaround by mistake!). > core-site.xml, hdfs-site.xml is distributed to the slave node - and that > file is read since I deliberately break the file then I get an error as > you'd expect. > > NB: This is a bit different to > http://mail-archives.us.apache.org/mod_mbox/spark-user/201402.mbox/%3c1392442185079-1549.p...@n3.nabble.com%3E > > > Spark 1.5.0: > > t=sc.textFile("hdfs://nameservice1/tmp/issue") > t.count() > (fails) > > t=sc.textFile("file://etc/passwd") > t.count() > (errors about bad url - should have an extra / of course) > t=sc.textFile("hdfs://nameservice1/tmp/issue") > t.count() > then it works!!! > > I should say that using file:///etc/passwd or hdfs:///tmp/issue both fail > as well. Unless preceded by a broken url.I've tried setting > spark.hadoop.cloneConf to true, no change. > > Sample (broken) run: > 15/09/14 13:00:14 DEBUG HadoopRDD: Creating new JobConf and caching it for > later re-use > 15/09/14 13:00:14 DEBUG : address: ip-10-1-200-165/10.1.200.165 > isLoopbackAddress: false, with host 10.1.200.165 ip-10-1-200-165 > 15/09/14 13:00:14 DEBUG BlockReaderLocal: > dfs.client.use.legacy.blockreader.local = false > 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.client.read.shortcircuit = > false > 15/09/14 13:00:14 DEBUG BlockReaderLocal: > dfs.client.domain.socket.data.traffic = false > 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path = > /var/run/hdfs-sockets/dn > 15/09/14 13:00:14 DEBUG HAUtil: No HA service delegation token found for > logical URI hdfs://nameservice1 > 15/09/14 13:00:14 DEBUG BlockReaderLocal: > dfs.client.use.legacy.blockreader.local = false > 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.client.read.shortcircuit = > false > 15/09/14 13:00:14 DEBUG BlockReaderLocal: > dfs.client.domain.socket.data.traffic = false > 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path = > /var/run/hdfs-sockets/dn > 15/09/14 13:00:14 DEBUG RetryUtils: multipleLinearRandomRetry = null > 15/09/14 13:00:14 DEBUG Server: rpcKind=RPC_PROTOCOL_BUFFER, > rpcRequestWrapperClass=class > org.apache.hadoop.ipc.ProtobufRpcEngine$RpcRequestWrapper, > rpcInvoker=org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker@6245f50b > 15/09/14 13:00:14 DEBUG Client: getting client out of cache: > org.apache.hadoop.ipc.Client@267f0fd3 > 15/09/14 13:00:14 DEBUG NativeCodeLoader: Trying to load the custom-built > native-hadoop library... > 15/09/14 13:00:14 DEBUG NativeCodeLoader: Loaded the native-hadoop library > ... > 15/09/14 13:00:14 DEBUG Client: Connecting to > mesos-1.example.com/10.1.200.165:8020 > 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to > mesos-1.example.com/10.1.200.165:8020 from ubuntu: starting, having > connections 1 > 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to > mesos-1.example.com/10.1.200.165:8020 from ubuntu sending #0 > 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to > mesos-1.example.com/10.1.200.165:8020 from ubuntu got value #0 > 15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getFileInfo took 36ms > 15/09/14 13:00:14 DEBUG FileInputFormat: Time taken to get FileStatuses: 69 > 15/09/14 13:00:14 INFO FileInputFormat: Total input paths to process : 1 > 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to > mesos-1.example.com/10.1.200.165:8020 from ubuntu sending #1 > 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to > mesos-1.example.com/10.1.200.165:8020 from ubuntu got value #1 > 15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 1ms > 15/09/14 13:00:14 DEBUG FileInputFormat: Total # of splits generated by > getSplits: 2, TimeTaken: 104 > ... > 15/09/14 13:00:24 DEBUG Client: IPC Client (1739425103) connection to > mesos-1.example.com/10.1.200.165:8020 from ubuntu: closed > 15/09/14 13:00:24 DEBUG Client: IPC Client (1739425103) connection to > mesos-1.example.com/10.1.200.165:8020 from ubuntu: stopped, remaining > connections 0 > 15/09/14 13:00:24 DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received > message > AkkaMessage(ExecutorRemoved(20150826-133446-3217621258-5050-4064-S1),true) > from Actor[akka://sparkDriver/temp/$g] > 15/09/14 13:00:24 DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: > AkkaMessage(ExecutorRemoved(20150826-133446-3217621258-5050-4064-S1),true) > 15/09/14 13:00:24 DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled > message
Re: *Metrics API is odd in MLLib
Hi Xiangrui Spark People, I recently got round to writing an evaluation framework for Spark that I was hoping to PR into MLLib and this would solve some of the aforementioned issues. I have put the code on github in a separate repo for now as I would like to get some sandboxed feedback. The repo complete with detailed documentation can be found here https://github.com/samthebest/sceval. Many thanks, Sam On Thu, Jun 18, 2015 at 11:00 AM, Sam samthesav...@gmail.com wrote: Firstly apologies for the header of my email containing some junk, I believe it's due to a copy and paste error on a smart phone. Thanks for your response. I will indeed make the PR you suggest, though glancing at the code I realize it's not just a case of making these public since the types are also private. Then, there is certain functionality I will be exposing, which then ought to be tested, e.g. every bin except potentially the last will have an equal number of data points in it*. I'll get round to it at some point. As for BinaryClassificationMetrics using Double for labels, thanks for the explanation. If I where to make a PR to encapsulate the underlying implementation (that uses LabeledPoint) and change the type to Boolean, would what be the impact to versioning (since I'd be changing public API)? An alternative would be to create a new wrapper class, say BinaryClassificationMeasures, and deprecate the old with the intention of migrating all the code into the new class. * Maybe some other part of the code base tests this, since this assumption must hold in order to average across folds in x-validation? On Thu, Jun 18, 2015 at 1:02 AM, Xiangrui Meng men...@gmail.com wrote: LabeledPoint was used for both classification and regression, where label type is Double for simplicity. So in BinaryClassificationMetrics, we still use Double for labels. We compute the confusion matrix at each threshold internally, but this is not exposed to users ( https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala#L127). Feel free to submit a PR to make it public. -Xiangrui On Mon, Jun 15, 2015 at 7:13 AM, Sam samthesav...@gmail.com wrote: Google+ https://plus.google.com/app/basic?nopromo=1source=moggl=uk http://mail.google.com/mail/x/mog-/gp/?source=moggl=uk Calendar https://www.google.com/calendar/gpcal?source=moggl=uk Web http://www.google.co.uk/?source=moggl=uk more Inbox Apache Spark Email GmailNot Work S sam.sav...@barclays.com to me 0 minutes ago Details According to https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics The constructor takes `RDD[(Double, Double)]` meaning lables are Doubles, this seems odd, shouldn't it be Boolean? Similarly for MutlilabelMetrics (I.e. Should be RDD[(Array[Double], Array[Boolean])]), and for MulticlassMetrics the type of both should be generic? Additionally it would be good if either the ROC output type was changed or another method was added that returned confusion matricies, so that the hard integer values can be obtained before the divisions. E.g. ``` case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int) { // bunch of methods for each of the things in the table here https://en.wikipedia.org/wiki/Receiver_operating_characteristic } ... def confusions(): RDD[Confusion] ```
Re: *Metrics API is odd in MLLib
Firstly apologies for the header of my email containing some junk, I believe it's due to a copy and paste error on a smart phone. Thanks for your response. I will indeed make the PR you suggest, though glancing at the code I realize it's not just a case of making these public since the types are also private. Then, there is certain functionality I will be exposing, which then ought to be tested, e.g. every bin except potentially the last will have an equal number of data points in it*. I'll get round to it at some point. As for BinaryClassificationMetrics using Double for labels, thanks for the explanation. If I where to make a PR to encapsulate the underlying implementation (that uses LabeledPoint) and change the type to Boolean, would what be the impact to versioning (since I'd be changing public API)? An alternative would be to create a new wrapper class, say BinaryClassificationMeasures, and deprecate the old with the intention of migrating all the code into the new class. * Maybe some other part of the code base tests this, since this assumption must hold in order to average across folds in x-validation? On Thu, Jun 18, 2015 at 1:02 AM, Xiangrui Meng men...@gmail.com wrote: LabeledPoint was used for both classification and regression, where label type is Double for simplicity. So in BinaryClassificationMetrics, we still use Double for labels. We compute the confusion matrix at each threshold internally, but this is not exposed to users ( https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala#L127). Feel free to submit a PR to make it public. -Xiangrui On Mon, Jun 15, 2015 at 7:13 AM, Sam samthesav...@gmail.com wrote: Google+ https://plus.google.com/app/basic?nopromo=1source=moggl=uk http://mail.google.com/mail/x/mog-/gp/?source=moggl=uk Calendar https://www.google.com/calendar/gpcal?source=moggl=uk Web http://www.google.co.uk/?source=moggl=uk more Inbox Apache Spark Email GmailNot Work S sam.sav...@barclays.com to me 0 minutes ago Details According to https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics The constructor takes `RDD[(Double, Double)]` meaning lables are Doubles, this seems odd, shouldn't it be Boolean? Similarly for MutlilabelMetrics (I.e. Should be RDD[(Array[Double], Array[Boolean])]), and for MulticlassMetrics the type of both should be generic? Additionally it would be good if either the ROC output type was changed or another method was added that returned confusion matricies, so that the hard integer values can be obtained before the divisions. E.g. ``` case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int) { // bunch of methods for each of the things in the table here https://en.wikipedia.org/wiki/Receiver_operating_characteristic } ... def confusions(): RDD[Confusion] ```
*Metrics API is odd in MLLib
Google+ https://plus.google.com/app/basic?nopromo=1source=moggl=uk http://mail.google.com/mail/x/mog-/gp/?source=moggl=uk Calendar https://www.google.com/calendar/gpcal?source=moggl=uk Web http://www.google.co.uk/?source=moggl=uk more Inbox Apache Spark Email GmailNot Work S sam.sav...@barclays.com to me 0 minutes ago Details According to https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics The constructor takes `RDD[(Double, Double)]` meaning lables are Doubles, this seems odd, shouldn't it be Boolean? Similarly for MutlilabelMetrics (I.e. Should be RDD[(Array[Double], Array[Boolean])]), and for MulticlassMetrics the type of both should be generic? Additionally it would be good if either the ROC output type was changed or another method was added that returned confusion matricies, so that the hard integer values can be obtained before the divisions. E.g. ``` case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int) { // bunch of methods for each of the things in the table here https://en.wikipedia.org/wiki/Receiver_operating_characteristic } ... def confusions(): RDD[Confusion] ```
Spark Python with SequenceFile containing numpy deserialized data in str form
Hi all, I'm storing an rdd as sequencefile with the following content: key=filename(string) value=python str from numpy.savez(not unicode) In order to make sure the whole numpy array get's stored I have to first serialize it with: def serialize_numpy_array(numpy_array): output = io.BytesIO() np.savez_compressed(output, x=numpy_array) return output.getvalue() type(output.getvalue()) str The deserialization returns a python str, *not unicode object*. After deserialization I call my_dersialized_numpy_rdd.saveAsSequenceFile(path) all works well and the RDD get stored successfully. Now the problem starts I want to read the sequencefile again: my_dersialized_numpy_rdd = sc.sequenceFile(path) first = my_dersialized_numpy_rdd.first() type(first[1]) unicode The previous str became a unicode object after we stored it to a sequencefile and read it again. Trying to convert it back with first[1].decode(ascii) fails with UnicodeEncodeError: 'ascii' codec can't encode characters in position 1-3: ordinal not in range(128) My expectation was that I would get the data back as how I stored it for example in str format and not in unicode format. Anybody suggestion how I can read back the original data. Will try converting the str to bytearray before storing it to a seqeencefile. Thanks, Sam Stoelinga
Re: Spark Python with SequenceFile containing numpy deserialized data in str form
Update: Using bytearray before storing to RDD is not a solution either. This happens when trying to read the RDD when the value was stored as python bytearray: Traceback (most recent call last): [0/9120] File /vagrant/python/kmeans.py, line 24, in module features = sc.sequenceFile(feature_sequencefile_path) File /usr/local/spark/python/pyspark/context.py, line 490, in sequenceFile keyConverter, valueConverter, minSplits, batchSize) File /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.sequenceFile. : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/tmp/feature-bytearray at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.RDD.take(RDD.scala:1156) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:205) at org.apache.spark.api.python.PythonRDD$.sequenceFile(PythonRDD.scala:447) at org.apache.spark.api.python.PythonRDD.sequenceFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) On Tue, Jun 9, 2015 at 11:04 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi all, I'm storing an rdd as sequencefile with the following content: key=filename(string) value=python str from numpy.savez(not unicode) In order to make sure the whole numpy array get's stored I have to first serialize it with: def serialize_numpy_array(numpy_array): output = io.BytesIO() np.savez_compressed(output, x=numpy_array) return output.getvalue() type(output.getvalue()) str The deserialization returns a python str, *not unicode object*. After deserialization I call my_dersialized_numpy_rdd.saveAsSequenceFile(path) all works well and the RDD get stored successfully. Now the problem starts I want to read the sequencefile again: my_dersialized_numpy_rdd = sc.sequenceFile(path) first = my_dersialized_numpy_rdd.first() type(first[1]) unicode The previous str became a unicode object after we stored it to a sequencefile and read it again. Trying to convert it back with first[1].decode(ascii) fails with UnicodeEncodeError: 'ascii' codec can't encode characters in position 1-3: ordinal not in range(128) My expectation was that I would get the data back as how I stored it for example in str format and not in unicode format. Anybody suggestion how I can read back the original data. Will try converting the str to bytearray before storing it to a seqeencefile. Thanks, Sam Stoelinga
Re: Spark Python with SequenceFile containing numpy deserialized data in str form
Update: I've done a workaround to use saveAsPickleFile instead which handles everything correctly. It stays in byte format. Noticed python got messy with str and byte being the same in Python 2.7, wondering whether using Python 3 would have the same problem. I would still like to use a cross language usable SequenceFile instead of using Picklefile though, so if anybody has pointers would appreciate that :) On Tue, Jun 9, 2015 at 11:35 AM, Sam Stoelinga sammiest...@gmail.com wrote: Update: Using bytearray before storing to RDD is not a solution either. This happens when trying to read the RDD when the value was stored as python bytearray: Traceback (most recent call last): [0/9120] File /vagrant/python/kmeans.py, line 24, in module features = sc.sequenceFile(feature_sequencefile_path) File /usr/local/spark/python/pyspark/context.py, line 490, in sequenceFile keyConverter, valueConverter, minSplits, batchSize) File /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.sequenceFile. : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/tmp/feature-bytearray at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.RDD.take(RDD.scala:1156) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:205) at org.apache.spark.api.python.PythonRDD$.sequenceFile(PythonRDD.scala:447) at org.apache.spark.api.python.PythonRDD.sequenceFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) On Tue, Jun 9, 2015 at 11:04 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi all, I'm storing an rdd as sequencefile with the following content: key=filename(string) value=python str from numpy.savez(not unicode) In order to make sure the whole numpy array get's stored I have to first serialize it with: def serialize_numpy_array(numpy_array): output = io.BytesIO() np.savez_compressed(output, x=numpy_array) return output.getvalue() type(output.getvalue()) str The deserialization returns a python str, *not unicode object*. After deserialization I call my_dersialized_numpy_rdd.saveAsSequenceFile(path) all works well and the RDD get stored successfully. Now the problem starts I want to read the sequencefile again: my_dersialized_numpy_rdd = sc.sequenceFile(path) first = my_dersialized_numpy_rdd.first() type(first[1]) unicode The previous str became a unicode object after we stored it to a sequencefile and read it again. Trying to convert it back with first[1].decode(ascii) fails with UnicodeEncodeError: 'ascii' codec can't encode characters in position 1-3: ordinal not in range(128) My expectation was that I would get the data back as how I stored it for example in str format and not in unicode format. Anybody suggestion how I can read back the original data. Will try converting the str
Re: PySpark with OpenCV causes python worker to crash
I've changed the SIFT feature extraction to SURF feature extraction and it works... Following line was changed: sift = cv2.xfeatures2d.SIFT_create() to sift = cv2.xfeatures2d.SURF_create() Where should I file this as a bug? When not running on Spark it works fine so I'm saying it's a spark bug. On Fri, Jun 5, 2015 at 2:17 PM, Sam Stoelinga sammiest...@gmail.com wrote: Yea should have emphasized that. I'm running the same code on the same VM. It's a VM with spark in standalone mode and I run the unit test directly on that same VM. So OpenCV is working correctly on that same machine but when moving the exact same OpenCV code to spark it just crashes. On Tue, Jun 2, 2015 at 5:06 AM, Davies Liu dav...@databricks.com wrote: Could you run the single thread version in worker machine to make sure that OpenCV is installed and configured correctly? On Sat, May 30, 2015 at 6:29 AM, Sam Stoelinga sammiest...@gmail.com wrote: I've verified the issue lies within Spark running OpenCV code and not within the sequence file BytesWritable formatting. This is the code which can reproduce that spark is causing the failure by not using the sequencefile as input at all but running the same function with same input on spark but fails: def extract_sift_features_opencv(imgfile_imgbytes): imgfilename, discardsequencefile = imgfile_imgbytes imgbytes = bytearray(open(/tmp/img.jpg, rb).read()) nparr = np.fromstring(buffer(imgbytes), np.uint8) img = cv2.imdecode(nparr, 1) gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) sift = cv2.xfeatures2d.SIFT_create() kp, descriptors = sift.detectAndCompute(gray, None) return (imgfilename, test) And corresponding tests.py: https://gist.github.com/samos123/d383c26f6d47d34d32d6 On Sat, May 30, 2015 at 8:04 PM, Sam Stoelinga sammiest...@gmail.com wrote: Thanks for the advice! The following line causes spark to crash: kp, descriptors = sift.detectAndCompute(gray, None) But I do need this line to be executed and the code does not crash when running outside of Spark but passing the same parameters. You're saying maybe the bytes from the sequencefile got somehow transformed and don't represent an image anymore causing OpenCV to crash the whole python executor. On Fri, May 29, 2015 at 2:06 AM, Davies Liu dav...@databricks.com wrote: Could you try to comment out some lines in `extract_sift_features_opencv` to find which line cause the crash? If the bytes came from sequenceFile() is broken, it's easy to crash a C library in Python (OpenCV). On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi sparkers, I am working on a PySpark application which uses the OpenCV library. It runs fine when running the code locally but when I try to run it on Spark on the same Machine it crashes the worker. The code can be found here: https://gist.github.com/samos123/885f9fe87c8fa5abf78f This is the error message taken from STDERR of the worker log: https://gist.github.com/samos123/3300191684aee7fc8013 Would like pointers or tips on how to debug further? Would be nice to know the reason why the worker crashed. Thanks, Sam Stoelinga org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)
Re: PySpark with OpenCV causes python worker to crash
Yea should have emphasized that. I'm running the same code on the same VM. It's a VM with spark in standalone mode and I run the unit test directly on that same VM. So OpenCV is working correctly on that same machine but when moving the exact same OpenCV code to spark it just crashes. On Tue, Jun 2, 2015 at 5:06 AM, Davies Liu dav...@databricks.com wrote: Could you run the single thread version in worker machine to make sure that OpenCV is installed and configured correctly? On Sat, May 30, 2015 at 6:29 AM, Sam Stoelinga sammiest...@gmail.com wrote: I've verified the issue lies within Spark running OpenCV code and not within the sequence file BytesWritable formatting. This is the code which can reproduce that spark is causing the failure by not using the sequencefile as input at all but running the same function with same input on spark but fails: def extract_sift_features_opencv(imgfile_imgbytes): imgfilename, discardsequencefile = imgfile_imgbytes imgbytes = bytearray(open(/tmp/img.jpg, rb).read()) nparr = np.fromstring(buffer(imgbytes), np.uint8) img = cv2.imdecode(nparr, 1) gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) sift = cv2.xfeatures2d.SIFT_create() kp, descriptors = sift.detectAndCompute(gray, None) return (imgfilename, test) And corresponding tests.py: https://gist.github.com/samos123/d383c26f6d47d34d32d6 On Sat, May 30, 2015 at 8:04 PM, Sam Stoelinga sammiest...@gmail.com wrote: Thanks for the advice! The following line causes spark to crash: kp, descriptors = sift.detectAndCompute(gray, None) But I do need this line to be executed and the code does not crash when running outside of Spark but passing the same parameters. You're saying maybe the bytes from the sequencefile got somehow transformed and don't represent an image anymore causing OpenCV to crash the whole python executor. On Fri, May 29, 2015 at 2:06 AM, Davies Liu dav...@databricks.com wrote: Could you try to comment out some lines in `extract_sift_features_opencv` to find which line cause the crash? If the bytes came from sequenceFile() is broken, it's easy to crash a C library in Python (OpenCV). On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi sparkers, I am working on a PySpark application which uses the OpenCV library. It runs fine when running the code locally but when I try to run it on Spark on the same Machine it crashes the worker. The code can be found here: https://gist.github.com/samos123/885f9fe87c8fa5abf78f This is the error message taken from STDERR of the worker log: https://gist.github.com/samos123/3300191684aee7fc8013 Would like pointers or tips on how to debug further? Would be nice to know the reason why the worker crashed. Thanks, Sam Stoelinga org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108)
Re: PySpark with OpenCV causes python worker to crash
Thanks Davies. I will file a bug later with code and single image as dataset. Next to that I can give anybody access to my vagrant VM that already has spark with OpenCV and the dataset available. Or you can setup the same vagrant machine at your place. All is automated ^^ git clone https://github.com/samos123/computer-vision-cloud-platform cd computer-vision-cloud-platform ./scripts/setup.sh vagrant ssh (Expect failures, I haven't cleaned up and tested it for other people) btw I study at Tsinghua also currently. On Fri, Jun 5, 2015 at 2:43 PM, Davies Liu dav...@databricks.com wrote: Please file a bug here: https://issues.apache.org/jira/browse/SPARK/ Could you also provide a way to reproduce this bug (including some datasets)? On Thu, Jun 4, 2015 at 11:30 PM, Sam Stoelinga sammiest...@gmail.com wrote: I've changed the SIFT feature extraction to SURF feature extraction and it works... Following line was changed: sift = cv2.xfeatures2d.SIFT_create() to sift = cv2.xfeatures2d.SURF_create() Where should I file this as a bug? When not running on Spark it works fine so I'm saying it's a spark bug. On Fri, Jun 5, 2015 at 2:17 PM, Sam Stoelinga sammiest...@gmail.com wrote: Yea should have emphasized that. I'm running the same code on the same VM. It's a VM with spark in standalone mode and I run the unit test directly on that same VM. So OpenCV is working correctly on that same machine but when moving the exact same OpenCV code to spark it just crashes. On Tue, Jun 2, 2015 at 5:06 AM, Davies Liu dav...@databricks.com wrote: Could you run the single thread version in worker machine to make sure that OpenCV is installed and configured correctly? On Sat, May 30, 2015 at 6:29 AM, Sam Stoelinga sammiest...@gmail.com wrote: I've verified the issue lies within Spark running OpenCV code and not within the sequence file BytesWritable formatting. This is the code which can reproduce that spark is causing the failure by not using the sequencefile as input at all but running the same function with same input on spark but fails: def extract_sift_features_opencv(imgfile_imgbytes): imgfilename, discardsequencefile = imgfile_imgbytes imgbytes = bytearray(open(/tmp/img.jpg, rb).read()) nparr = np.fromstring(buffer(imgbytes), np.uint8) img = cv2.imdecode(nparr, 1) gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) sift = cv2.xfeatures2d.SIFT_create() kp, descriptors = sift.detectAndCompute(gray, None) return (imgfilename, test) And corresponding tests.py: https://gist.github.com/samos123/d383c26f6d47d34d32d6 On Sat, May 30, 2015 at 8:04 PM, Sam Stoelinga sammiest...@gmail.com wrote: Thanks for the advice! The following line causes spark to crash: kp, descriptors = sift.detectAndCompute(gray, None) But I do need this line to be executed and the code does not crash when running outside of Spark but passing the same parameters. You're saying maybe the bytes from the sequencefile got somehow transformed and don't represent an image anymore causing OpenCV to crash the whole python executor. On Fri, May 29, 2015 at 2:06 AM, Davies Liu dav...@databricks.com wrote: Could you try to comment out some lines in `extract_sift_features_opencv` to find which line cause the crash? If the bytes came from sequenceFile() is broken, it's easy to crash a C library in Python (OpenCV). On Thu, May 28, 2015 at 8:33 AM, Sam Stoelinga sammiest...@gmail.com wrote: Hi sparkers, I am working on a PySpark application which uses the OpenCV library. It runs fine when running the code locally but when I try to run it on Spark on the same Machine it crashes the worker. The code can be found here: https://gist.github.com/samos123/885f9fe87c8fa5abf78f This is the error message taken from STDERR of the worker log: https://gist.github.com/samos123/3300191684aee7fc8013 Would like pointers or tips on how to debug further? Would be nice to know the reason why the worker crashed. Thanks, Sam Stoelinga org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:172) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203