Let me give you a possible direction, please do not use as it is :)

>>> r = sc.parallelize([1,3,4,6,8,11,12,5],3)

here, I am loading some numbers and partitioning. This partitioning is
critical. You may just use partitioning scheme comes with Spark (like
above) or, use your own through partitionBykey. This should have 2
criteria:

a) Even distribution and Each partition should be small enough to be held
in memory
b) Partition boundaries are continuous.

Now, let us write a function which operates on an iterator and do something
(here, it only concats, but you can use it to sort, loop through and emit
missing ones)

>>>
>>> def f(iterator):
...     yield ",".join(map(str,iterator))

Now, you can use RDD operation to run this function on each partition:

>>> r1 = r.mapPartitions(f)

Now, you would have local missing values. You can now write them out to a
file.

On Mon, Sep 19, 2016 at 4:39 PM, Sudhindra Magadi <smag...@gmail.com> wrote:

> that is correct
>
> On Mon, Sep 19, 2016 at 12:09 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Ok, so if you see
>>
>> 1,3,4,6.....
>>
>> Will you say 2,5 are missing?
>>
>> On Mon, Sep 19, 2016 at 4:15 PM, Sudhindra Magadi <smag...@gmail.com>
>> wrote:
>>
>>> Each of the records will be having a sequence id .No duplicates
>>>
>>> On Mon, Sep 19, 2016 at 11:42 AM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> And how do you define missing sequence? Can you give an example?
>>>>
>>>> On Mon, Sep 19, 2016 at 3:48 PM, Sudhindra Magadi <smag...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jorn ,
>>>>>  We have a file with billion records.We want to find if there any
>>>>> missing sequences here .If so what are they ?
>>>>> Thanks
>>>>> Sudhindra
>>>>>
>>>>> On Mon, Sep 19, 2016 at 11:12 AM, Jörn Franke <jornfra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I am not sure what you try to achieve here. Can you please tell us
>>>>>> what the goal of the program is. Maybe with some example data?
>>>>>>
>>>>>> Besides this, I have the feeling that it will fail once it is not
>>>>>> used in a single node scenario due to the reference to the global counter
>>>>>> variable.
>>>>>>
>>>>>> Also unclear why you collect the data first to parallelize it again.
>>>>>>
>>>>>> On 18 Sep 2016, at 14:26, sudhindra <smag...@gmail.com> wrote:
>>>>>>
>>>>>> Hi i have coded something like this , pls tell me how bad it is .
>>>>>>
>>>>>> package Spark.spark;
>>>>>> import java.util.List;
>>>>>> import java.util.function.Function;
>>>>>>
>>>>>> import org.apache.spark.SparkConf;
>>>>>> import org.apache.spark.SparkContext;
>>>>>> import org.apache.spark.api.java.JavaRDD;
>>>>>> import org.apache.spark.api.java.JavaSparkContext;
>>>>>> import org.apache.spark.sql.DataFrame;
>>>>>> import org.apache.spark.sql.Dataset;
>>>>>> import org.apache.spark.sql.Row;
>>>>>> import org.apache.spark.sql.SQLContext;
>>>>>>
>>>>>>
>>>>>>
>>>>>> public class App
>>>>>> {
>>>>>>    static long counter=1;
>>>>>>    public static void main( String[] args )
>>>>>>    {
>>>>>>
>>>>>>
>>>>>>
>>>>>>        SparkConf conf = new
>>>>>> SparkConf().setAppName("sorter").setMaster("local[2]").set("
>>>>>> spark.executor.memory","1g");
>>>>>>        JavaSparkContext sc = new JavaSparkContext(conf);
>>>>>>
>>>>>>        SQLContext sqlContext = new org.apache.spark.sql.SQLContex
>>>>>> t(sc);
>>>>>>
>>>>>>        DataFrame df = sqlContext.read().json("path");
>>>>>>        DataFrame sortedDF = df.sort("id");
>>>>>>        //df.show();
>>>>>>        //sortedDF.printSchema();
>>>>>>
>>>>>>        System.out.println(sortedDF.collectAsList().toString());
>>>>>>        JavaRDD<Row> distData = sc.parallelize(sortedDF.collec
>>>>>> tAsList());
>>>>>>
>>>>>>
>>>>>>     List<String >missingNumbers=distData.map(new
>>>>>> org.apache.spark.api.java.function.Function<Row, String>() {
>>>>>>
>>>>>>
>>>>>>            public String call(Row arg0) throws Exception {
>>>>>>                // TODO Auto-generated method stub
>>>>>>
>>>>>>
>>>>>>                if(counter!=new Integer(arg0.getString(0)).intValue())
>>>>>>                {
>>>>>>                    StringBuffer misses = new StringBuffer();
>>>>>>                    long newCounter=counter;
>>>>>>                    while(newCounter!=new
>>>>>> Integer(arg0.getString(0)).intValue())
>>>>>>                    {
>>>>>>                        misses.append(new String(new Integer((int)
>>>>>> counter).toString()) );
>>>>>>                        newCounter++;
>>>>>>
>>>>>>                    }
>>>>>>                    counter=new Integer(arg0.getString(0)).int
>>>>>> Value()+1;
>>>>>>                    return misses.toString();
>>>>>>
>>>>>>                }
>>>>>>                counter++;
>>>>>>                return null;
>>>>>>
>>>>>>
>>>>>>
>>>>>>            }
>>>>>>        }).collect();
>>>>>>
>>>>>>
>>>>>>
>>>>>>        for (String name: missingNumbers) {
>>>>>>              System.out.println(name);
>>>>>>            }
>>>>>>
>>>>>>
>>>>>>
>>>>>>    }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>> 1001560.n3.nabble.com/filling-missing-values-in-a-sequence-t
>>>>>> p5708p27748.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards
>>>>> Sudhindra S Magadi
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>>
>>> --
>>> Thanks & Regards
>>> Sudhindra S Magadi
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Thanks & Regards
> Sudhindra S Magadi
>



-- 
Best Regards,
Ayan Guha

Reply via email to