thanks ayan On Mon, Sep 19, 2016 at 12:25 PM, ayan guha <guha.a...@gmail.com> wrote:
> Let me give you a possible direction, please do not use as it is :) > > >>> r = sc.parallelize([1,3,4,6,8,11,12,5],3) > > here, I am loading some numbers and partitioning. This partitioning is > critical. You may just use partitioning scheme comes with Spark (like > above) or, use your own through partitionBykey. This should have 2 > criteria: > > a) Even distribution and Each partition should be small enough to be held > in memory > b) Partition boundaries are continuous. > > Now, let us write a function which operates on an iterator and do > something (here, it only concats, but you can use it to sort, loop through > and emit missing ones) > > >>> > >>> def f(iterator): > ... yield ",".join(map(str,iterator)) > > Now, you can use RDD operation to run this function on each partition: > > >>> r1 = r.mapPartitions(f) > > Now, you would have local missing values. You can now write them out to a > file. > > On Mon, Sep 19, 2016 at 4:39 PM, Sudhindra Magadi <smag...@gmail.com> > wrote: > >> that is correct >> >> On Mon, Sep 19, 2016 at 12:09 PM, ayan guha <guha.a...@gmail.com> wrote: >> >>> Ok, so if you see >>> >>> 1,3,4,6..... >>> >>> Will you say 2,5 are missing? >>> >>> On Mon, Sep 19, 2016 at 4:15 PM, Sudhindra Magadi <smag...@gmail.com> >>> wrote: >>> >>>> Each of the records will be having a sequence id .No duplicates >>>> >>>> On Mon, Sep 19, 2016 at 11:42 AM, ayan guha <guha.a...@gmail.com> >>>> wrote: >>>> >>>>> And how do you define missing sequence? Can you give an example? >>>>> >>>>> On Mon, Sep 19, 2016 at 3:48 PM, Sudhindra Magadi <smag...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Jorn , >>>>>> We have a file with billion records.We want to find if there any >>>>>> missing sequences here .If so what are they ? >>>>>> Thanks >>>>>> Sudhindra >>>>>> >>>>>> On Mon, Sep 19, 2016 at 11:12 AM, Jörn Franke <jornfra...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I am not sure what you try to achieve here. Can you please tell us >>>>>>> what the goal of the program is. Maybe with some example data? >>>>>>> >>>>>>> Besides this, I have the feeling that it will fail once it is not >>>>>>> used in a single node scenario due to the reference to the global >>>>>>> counter >>>>>>> variable. >>>>>>> >>>>>>> Also unclear why you collect the data first to parallelize it again. >>>>>>> >>>>>>> On 18 Sep 2016, at 14:26, sudhindra <smag...@gmail.com> wrote: >>>>>>> >>>>>>> Hi i have coded something like this , pls tell me how bad it is . >>>>>>> >>>>>>> package Spark.spark; >>>>>>> import java.util.List; >>>>>>> import java.util.function.Function; >>>>>>> >>>>>>> import org.apache.spark.SparkConf; >>>>>>> import org.apache.spark.SparkContext; >>>>>>> import org.apache.spark.api.java.JavaRDD; >>>>>>> import org.apache.spark.api.java.JavaSparkContext; >>>>>>> import org.apache.spark.sql.DataFrame; >>>>>>> import org.apache.spark.sql.Dataset; >>>>>>> import org.apache.spark.sql.Row; >>>>>>> import org.apache.spark.sql.SQLContext; >>>>>>> >>>>>>> >>>>>>> >>>>>>> public class App >>>>>>> { >>>>>>> static long counter=1; >>>>>>> public static void main( String[] args ) >>>>>>> { >>>>>>> >>>>>>> >>>>>>> >>>>>>> SparkConf conf = new >>>>>>> SparkConf().setAppName("sorter").setMaster("local[2]").set(" >>>>>>> spark.executor.memory","1g"); >>>>>>> JavaSparkContext sc = new JavaSparkContext(conf); >>>>>>> >>>>>>> SQLContext sqlContext = new org.apache.spark.sql.SQLContex >>>>>>> t(sc); >>>>>>> >>>>>>> DataFrame df = sqlContext.read().json("path"); >>>>>>> DataFrame sortedDF = df.sort("id"); >>>>>>> //df.show(); >>>>>>> //sortedDF.printSchema(); >>>>>>> >>>>>>> System.out.println(sortedDF.collectAsList().toString()); >>>>>>> JavaRDD<Row> distData = sc.parallelize(sortedDF.collec >>>>>>> tAsList()); >>>>>>> >>>>>>> >>>>>>> List<String >missingNumbers=distData.map(new >>>>>>> org.apache.spark.api.java.function.Function<Row, String>() { >>>>>>> >>>>>>> >>>>>>> public String call(Row arg0) throws Exception { >>>>>>> // TODO Auto-generated method stub >>>>>>> >>>>>>> >>>>>>> if(counter!=new Integer(arg0.getString(0)).int >>>>>>> Value()) >>>>>>> { >>>>>>> StringBuffer misses = new StringBuffer(); >>>>>>> long newCounter=counter; >>>>>>> while(newCounter!=new >>>>>>> Integer(arg0.getString(0)).intValue()) >>>>>>> { >>>>>>> misses.append(new String(new Integer((int) >>>>>>> counter).toString()) ); >>>>>>> newCounter++; >>>>>>> >>>>>>> } >>>>>>> counter=new Integer(arg0.getString(0)).int >>>>>>> Value()+1; >>>>>>> return misses.toString(); >>>>>>> >>>>>>> } >>>>>>> counter++; >>>>>>> return null; >>>>>>> >>>>>>> >>>>>>> >>>>>>> } >>>>>>> }).collect(); >>>>>>> >>>>>>> >>>>>>> >>>>>>> for (String name: missingNumbers) { >>>>>>> System.out.println(name); >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> View this message in context: http://apache-spark-user-list. >>>>>>> 1001560.n3.nabble.com/filling-missing-values-in-a-sequence-t >>>>>>> p5708p27748.html >>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>> Nabble.com. >>>>>>> >>>>>>> ------------------------------------------------------------ >>>>>>> --------- >>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks & Regards >>>>>> Sudhindra S Magadi >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Ayan Guha >>>>> >>>> >>>> >>>> >>>> -- >>>> Thanks & Regards >>>> Sudhindra S Magadi >>>> >>> >>> >>> >>> -- >>> Best Regards, >>> Ayan Guha >>> >> >> >> >> -- >> Thanks & Regards >> Sudhindra S Magadi >> > > > > -- > Best Regards, > Ayan Guha > -- Thanks & Regards Sudhindra S Magadi