Hello Fabian, Thank you for the response, but I have been stuck on how to iterate over the DataSet, perform operations and return a new modified DataSet similar to that of list operation as shown below. Eg: Currently I am doing the following: for (Centroid centroid : centroids.collect()) { for (Tuple2<Integer, Point> element : clusteredPoints.collect()) { //perform necessary operations } //add elements } //return elements list
It would be really nice if I could just get started. I have been trying to add element to DataSet using *join*, but when I print the DataSet it contains only one initial element, it prints the same value as initial set value. for(....){ newElement = new Tuple3<Integer, Point, Boolean>(); dataSetElement.join(env.fromElements(newElement)); dataSetElement.print(); } Unsure if I am using right function or using join in a wrong manner. Best Regards, Subash Basnet On Wed, Feb 10, 2016 at 6:33 PM, Fabian Hueske <fhue...@gmail.com> wrote: > I would try to do the outlier compuation with the DataSet API instead of > fetching the results to the client with collect(). > If you do that, you can directly use writeAsCsv because the result is > still a DataSet. > > What you have to do, is to translate your findOutliers method into DataSet > API code. > > Best, Fabian > > 2016-02-10 18:29 GMT+01:00 subash basnet <yasub...@gmail.com>: > >> Hello Fabian, >> >> As written before code: >> >> >> >> *DataSet<Tuple3> fElements = >> env.fromCollection(findOutliers(clusteredPoints, >> finalCentroids));fElements.writeAsCsv(outputPath, "\n", " >> ");env.execute("KMeans Example");* >> I am very new to flink so not so clear about what you suggested, by >> option(1) you meant that I write my own FileWriter here rather than using >> *writeAsCsv()* method. And option(2) I couldn't understand where to >> compute the outlier. I would want to use the *writeAsCsv() *method but >> currently it doesn't perform the write operation and unable to understand >> why. >> >> An interesting thing I found is, when I run the *outlierDetection* class >> from eclipse a single file *result* gets written within the kmeans >> folder, whereas in case of default *KMeans* class it writes a result >> folder within the kmeans folder and the files with points are written >> inside the result folder. >> I give the necessary path in the arguments while running. >> Eg: file:///home/softwares/flink-0.10.0/kmeans/points >> file:///home/softwares/flink-0.10.0/kmeans/centers >> file:///home/softwares/flink-0.10.0/kmeans/result 10 >> >> Now, after I create the runnable jar file for KMeans and outlierDetection >> class, when I upload it to *flink web submission client *it works fine >> for *KMeans.jar*, the folder and files get created. But incase of >> *outlierDetection.jar* no file or folder get's written inside kmeans. >> >> How is it that outlier class is able to write file via eclipse but >> outlier jar not able to write via flink web submission client. >> >> >> Best Regards, >> Subash Basnet >> >> On Wed, Feb 10, 2016 at 1:58 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Subash, >>> >>> I would not fetch the data to the client, do the computation there, and >>> send it back, just for the purpose of writing it to a file. >>> >>> Either 1) pull the results to the client and write the file from there >>> or 2) compute the outliers in the cluster. >>> I did not study your code completely, but the two nested loops and the >>> condition are a join for example. >>> >>> I would go for option 2, if possible. >>> >>> Best, Fabian >>> >>> >>> 2016-02-10 13:07 GMT+01:00 subash basnet <yasub...@gmail.com>: >>> >>>> Hello Fabian, >>>> >>>> I use the collect() method to get the elements locally and perform >>>> operations on that and return the result as a collection. The collection >>>> result is converted to the DataSet in the calling method. >>>> Below is the code of *findOutliers *method: >>>> >>>> public static List<Tuple3> findOutliers(DataSet<Tuple2<Integer, Point>> >>>> clusteredPoints, >>>> DataSet<Centroid> centroids) throws Exception { >>>> List<Tuple3> finalElements = new ArrayList<Tuple3>(); >>>> *List<Tuple2<Integer, Point>> elements = clusteredPoints.collect();* >>>> * List<Centroid> centroidList = centroids.collect();* >>>> List<Tuple3<Centroid, Tuple2<Integer, Point>, Double>> >>>> elementsWithDistance = new ArrayList<Tuple3<Centroid, >>>> Tuple2<Integer, Point>, Double>>(); >>>> for (Centroid centroid : centroidList) { >>>> elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer, >>>> Point>, Double>>(); >>>> double totalDistance = 0; >>>> int elementsCount = 0; >>>> for (Tuple2<Integer, Point> e : elements) { >>>> // compute distance >>>> if (e.f0 == centroid.id) { >>>> Tuple3<Centroid, Tuple2<Integer, Point>, Double> newElement = new >>>> Tuple3<Centroid, >>>> Tuple2<Integer, Point>, Double>(); >>>> double distance = e.f1.euclideanDistance(centroid); >>>> totalDistance += distance; >>>> newElement.setFields(centroid, e, distance); >>>> elementsWithDistance.add(newElement); >>>> elementsCount++; >>>> } >>>> } >>>> // finding mean >>>> double mean = totalDistance / elementsCount; >>>> double sdTotalDistanceSquare = 0; >>>> for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> >>>> elementWithDistance : elementsWithDistance) { >>>> double distanceSquare = Math.pow(mean - elementWithDistance.f2, 2); >>>> sdTotalDistanceSquare += distanceSquare; >>>> } >>>> double sd = Math.sqrt(sdTotalDistanceSquare / elementsCount); >>>> double upperlimit = mean + 2 * sd; >>>> double lowerlimit = mean - 2 * sd; >>>> Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, >>>> Boolean>();// true >>>> // = >>>> // outlier >>>> for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> >>>> elementWithDistance : elementsWithDistance) { >>>> newElement = new Tuple3<Integer, Point, Boolean>(); >>>> if (elementWithDistance.f2 < lowerlimit || elementWithDistance.f2 > >>>> upperlimit) { >>>> // set as outlier >>>> newElement.setFields(elementWithDistance.f1.f0, >>>> elementWithDistance.f1.f1, true); >>>> } else { >>>> newElement.setFields(elementWithDistance.f1.f0, >>>> elementWithDistance.f1.f1, false); >>>> } >>>> finalElements.add(newElement); >>>> } >>>> } >>>> return finalElements; >>>> } >>>> >>>> I have attached herewith the screenshot of my project structure and >>>> KMeansOutlierDetection.java file for more clarity. >>>> >>>> >>>> Best Regards, >>>> Subash Basnet >>>> >>>> On Wed, Feb 10, 2016 at 12:26 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is >>>>> eligible for Automatic Cleanup! (fhue...@gmail.com) Add cleanup rule >>>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3D8hdIOJf0i4083WeIB%252BQUXfS8djluXs0JekXPLuRpitIZdx1%252FAH%252BEFK2XXNXEBi6cnglpq9HBimim9%252FKCQ7UDLHnqGh6CGYAGY7zzpc82QTIAjEQM22%252FkBmdko8aAcxcD2P3ax587Jik%253D%26key%3DNO%252B%252BpxOTI6yOrzMtJK8863zNLUnk0hGhdxHIyLoWxck%253D&tc_serial=24326433442&tc_rand=1135108797&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>>>> | More info >>>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=24326433442&tc_rand=1135108797&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>>>> >>>>> Hi Subash, >>>>> >>>>> how is findOutliers implemented? >>>>> >>>>> It might be that you mix-up local and cluster computation. All >>>>> DataSets are processed in the cluster. Please note the following: >>>>> - ExecutionEnvironment.fromCollection() transforms a client local >>>>> connection into a DataSet by serializing it and sending it to the cluster. >>>>> - DataSet.collect() transforms a DataSet into a collection and ships >>>>> it back to the client. >>>>> >>>>> So, does findOutliers operate on the cluster or on the local client, >>>>> i.e., does it work with DataSet and send the result back as a collection >>>>> or >>>>> does it first collect the results as collection and operate on these? >>>>> >>>>> Best, Fabian >>>>> >>>>> 2016-02-10 12:13 GMT+01:00 subash basnet <yasub...@gmail.com>: >>>>> >>>>>> Hello Stefano, >>>>>> >>>>>> Yeah the type casting worked, thank you. But not able to print the >>>>>> Dataset to the file. >>>>>> >>>>>> The default below code which writes the KMeans points along with >>>>>> their centroid numbers to the file works fine: >>>>>> // feed new centroids back into next iteration >>>>>> DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); >>>>>> DataSet<Tuple2<Integer, Point>> clusteredPoints = points >>>>>> // assign points to final clusters >>>>>> .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, >>>>>> "centroids"); >>>>>> if (fileOutput) { >>>>>> clusteredPoints.writeAsCsv(outputPath, "\n", " "); >>>>>> // since file sinks are lazy, we trigger the execution explicitly >>>>>> env.execute("KMeans Example"); >>>>>> } >>>>>> >>>>>> But my modified code below to find outlier: >>>>>> // feed new centroids back into next iteration >>>>>> DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); >>>>>> DataSet<Tuple2<Integer, Point>> clusteredPoints = points >>>>>> // assign points to final clusters >>>>>> .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, >>>>>> "centroids"); >>>>>> *DataSet<Tuple3> fElements = >>>>>> env.fromCollection(findOutliers(clusteredPoints, finalCentroids));* >>>>>> if (fileOutput) { >>>>>> *fElements.writeAsCsv(outputPath, "\n", " ");* >>>>>> // since file sinks are lazy, we trigger the execution explicitly >>>>>> env.execute("KMeans Example"); >>>>>> } >>>>>> >>>>>> It's not writing to the file, the *result *folder does not get >>>>>> created inside kmeans folder where my centers, points file are located. I >>>>>> am only able to print it to the console via *fElements.print();* >>>>>> >>>>>> Does it have something to do with *env.exectue("")*, which must be >>>>>> set somewhere in the previous case but not in my case. >>>>>> >>>>>> >>>>>> >>>>>> Best Regards, >>>>>> Subash Basnet >>>>>> >>>>>> >>>>>> On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino < >>>>>> stefano.bagh...@radicalbit.io> wrote: >>>>>> >>>>>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is >>>>>>> eligible for Automatic Cleanup! (stefano.bagh...@radicalbit.io) Add >>>>>>> cleanup rule >>>>>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3D1lghJuQA8DeL%252BeQeu%252BXtjFS3Ln6XzfMngdzEeoXhxNL9D%252Fev2KZxlYVTG7zXzAOKqyTfuHHhyjFCeIEJhsuw2xSonJ%252Fz9ELZJGQHf2k5wgw88cHdkws1iTkY3LXpay0T6G30GCRRcKpUcUeyr6wyDBlPBPj1idLV%26key%3DzNLvLvtrNviObgkHecr87NQBUPN5j9wZMWIyBsSzzNM%253D&tc_serial=24317106750&tc_rand=543304732&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>>>>>> | More info >>>>>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=24317106750&tc_rand=543304732&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001> >>>>>>> >>>>>>> Assuming your EnvironmentContext is named `env` Simply call: >>>>>>> >>>>>>> DataSet<Tuple3<Integer, Point, Boolean>> fElements = env. >>>>>>> *fromCollection*(finalElements); >>>>>>> >>>>>>> Does this help? >>>>>>> >>>>>>> On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <yasub...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello all, >>>>>>>> >>>>>>>> I have performed a modification in KMeans code to detect outliers. >>>>>>>> I have printed the output in the console but I am not able to write it >>>>>>>> to >>>>>>>> the file using the given 'writeAsCsv' method. >>>>>>>> The problem is I generate a list of tuples. >>>>>>>> My List is: >>>>>>>> List<Tuple3> finalElements = new ArrayList<Tuple3>(); >>>>>>>> Following is the datatype of the elements added to the list: >>>>>>>> Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, >>>>>>>> Point, Boolean>(); >>>>>>>> finalElements.add(newElement); >>>>>>>> Now I am stuck on how to convert this 'finalElements' to >>>>>>>> DataSet<Tuple3<Integer, Point, Boolean>> fElements, >>>>>>>> so that I could use >>>>>>>> fElements.writeAsCsv(outputPath, "\n"," "); >>>>>>>> >>>>>>>> Best Regards, >>>>>>>> Subash Basnet >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> BR, >>>>>>> Stefano Baghino >>>>>>> >>>>>>> Software Engineer @ Radicalbit >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >