Repository: incubator-madlib Updated Branches: refs/heads/master 2f1c4b288 -> c82c0f382
Feature: Add grouping to weakly connected components JIRA: MADLIB-1083 Add grouping support to weakly connected components. Make necessary changes in the queries involved, docs, and install check. Closes #147 Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/c82c0f38 Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/c82c0f38 Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/c82c0f38 Branch: refs/heads/master Commit: c82c0f3823425239bc755e155f6d23ef6eb266af Parents: 2f1c4b2 Author: Nandish Jayaram <njaya...@apache.org> Authored: Thu Jun 29 10:17:01 2017 -0700 Committer: Nandish Jayaram <njaya...@apache.org> Committed: Thu Jul 6 13:51:33 2017 -0700 ---------------------------------------------------------------------- doc/design/figures/wcc_example.pdf | 163 +++++++++++++++++++ doc/design/modules/graph.tex | 111 +++++++++++++ doc/literature.bib | 10 ++ doc/mainpage.dox.in | 3 + .../postgres/modules/graph/test/wcc.sql_in | 46 +++++- src/ports/postgres/modules/graph/wcc.py_in | 146 ++++++++++++++--- src/ports/postgres/modules/graph/wcc.sql_in | 52 +++--- 7 files changed, 479 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/doc/design/figures/wcc_example.pdf ---------------------------------------------------------------------- diff --git a/doc/design/figures/wcc_example.pdf b/doc/design/figures/wcc_example.pdf new file mode 100644 index 0000000..a57cd5e --- /dev/null +++ b/doc/design/figures/wcc_example.pdf @@ -0,0 +1,163 @@ +%PDF-1.4 +%���� +1 0 obj + << + /Title () + /Author () + /Subject () + /Keywords () + /Creator (yExport 1.5) + /Producer (org.freehep.graphicsio.pdf.YPDFGraphics2D 1.5) + /CreationDate (D:20170703114556-07'00') + /ModDate (D:20170703114556-07'00') + /Trapped /False + >> +endobj +2 0 obj + << + /Type /Catalog + /Pages 3 0 R + /ViewerPreferences 4 0 R + /OpenAction [5 0 R /Fit] + >> +endobj +4 0 obj + << + /FitWindow true + /CenterWindow false + >> +endobj +5 0 obj + << + /Parent 3 0 R + /Type /Page + /Contents 6 0 R + >> +endobj +6 0 obj + << + /Length 7 0 R + /Filter [/ASCII85Decode /FlateDecode] + >> +stream +Gb!SobDpGMD4T9[g`P`#4\._j)&Y-d5s'^XK*(B\e.%T9rrI0m;VHJi.X0dC:Q2;q&*g<\85U)?c%dbj +ao;1'DEiTm2d#p`IscVArpnr"pPId7C?iirh\H]b\!K17\!LH.rr(qCNR,B;le?pfnfR\^?iTi814K/I +?U+PgTE"DD5Pk>6^GLdsr[4lAW11i@5O.B.GHREJC&!0(qQEWmWTF955F!t?%PA[E\%-d"oY]RU7Y=k5 +qYnq6]\p8Q[Im@Zc[Wl[r\#pSq558&djEs2n[nFC1ep*OKDn+54nRXQXA3p:#F8>M1;6agr/dF.8Z;8H +Y:m;DUR+,IDj9q;(NUUX#*PX\j2s<Bc1P)gBY.-M45\q@hW`c::"c7qrq')WNiKMi;ia6]NpE4UR'8G^ +7r'4)FS&<JoCGi@MUR\,nMj.LS]mk)$f:e<*Eq=YON24M5[_`Q!rn:%^MWmm..HQWcbDV;V_[O3(P-'d +db@2,d`+utU@Y)ed2Sq5S'29;Ne0$0)go+I@b!=sDNpn[ki-l38X^WqT6D_<CqIUYJKa\+8$a_toOpMg +ctoX$EuJd9Ys"J\+.[Gp+K^&*E,Xg!p'jPI=q?`/Ps[$8"6ZEiNn-K!]pc+6:-*"L>SAFrX10K<&Y+Hp +;!104gCBe;W+$]2%2-dVD6<C*ZEW0^=X_$+3f#5)j/VM^YYNq:_I`#!;;Vg2Vcct[9&Ze,H87U;949UK +m+s$U',q[&"*4R6#n2h@+gT)&Vn7CN%DL>\^s><$hY]2;[58fJ.aO3CkM4WKGN[bBmX)'bl@Et]2W6E& +^"A/$:[S1rrgeP5s/YRCF2W),(KWptmaurW`Vm"cP;u]gm%K(OA\.H?\Ud\XcRI>&Dd%NueJPDIb'N7J +<<$T)EDoLnAqM]O0)263D-L9"*8@t;\fAr>[bm8<,EV</YW8XF>2UYbV>U\6_RUE5*B9N921u7S(oX#S +_4CF-?k]Amo.AGrr.Rip]$RM$.gs+_Qu:<Uc%jNYo[1[\k(U&Q4@hA3=&5V^/g1J)l,r+$No4&7FSHM> +Y?XTH*eb;#ABKUd@0p)&9a*0%B#n56ak<F@6h%1c%V7'IG>4qekGTB,Eh,$_,]L5[)n<^k!p.:iQaBQ3 +:g\:S%RWH0l<9WOkm6J4B,N[]4]bLPCj-ahV+jJjH"e#%pU\Z`MojPDUX3H(US1%/bhqi3qt`=$m[*Z( +ch"[sjedriI7IK6U'iROV@/C\&Yo.kREJ=eKmcIL1>V^X?*%%9<,Rpu*d$p7T*mZVgGNKdN?^Yd>b)BZ +/jLcWq6j;CR9Cpif6]X9LT@!AD"$o9hrhblZJl`R*YgaA8F/(ok.%Y*=8e)en`sE;o58&2Ef$s50,u^; +=qH'NK0!,fg<lQs\9:>og2E5Sb1Gb[p6[H\,MigB3uX&X""stOnh0BpDrXZY0c7T"fS*g)pU"cN!GP.u +pgWLW5Tqk@cp>8S"Pm1G;:!Y3(rnVhFTWPR=_d)J/R(fba[Tpm<!IBFNB/6bT1c9,beU2=ZMOt(n5X]) +lLMCL.!$SdTKpUrc7q7HL.BQEq$DP+D>@&.Ms`ZQ`"`m()tuS?_N6g(5%a&OHMRt*rOap2QcULO/`FmE +T4pHN73e(="#Gd4:Qfui6Bhh"?:X3oAb<,UKF<XE0$$-N6EcWgb]D:O5!oM:Fg[.R).@pkHi3LA"3LDi +^PdqtqKEhQieIVQs1UtoNA=l:DZ;?C0E,8L``Eu6=>`S07O`Pqc]'X>B[_6RKV;)*Rb`<]ft8.I-tYbW +Ba_WF>;"qUSa]NBn6q&`P`^2D/Wrq;a!NODa=IeRA_h$EVQqq/$W8HI30M@,Gk!/*atT5!)Re`M;+;G& +n<QD]A"culD<,b;*9CZEU>uKV5oGd#7;9iUTqUN[<]s<4X7a$A!Y!"&;K4i*Z;,,BMM;P$W?lj9"9q2' +@RK"d;<YK\UQZb?:p&5>/OY%'$YLa2*gn/PCWr!Y$&MO&9.t-,Q`5sc1b>BZ&FT8^ap?>=\PX1.()+2& +rTAHJ06du0gE"Cl*T<K$<l*2/ZSZ4,)^rlrR+k^U>bfH+&X5mU95Ir3K9Xi]*0i;R-ueGdboUq_#BFl` +H5X,'k?DU'2(aaNDAAA5KiL3m)3f=toZC9_kW.oi2+k%2H@QI6A[QD]dndDufpD89p%pba-=>Y_5-[Rj ++$tZSl`Q/"`<1:JNGW/D/XsR28$=t...@ohe.ld=.kW=2$ROE7p3p:mN'XZM;G>AN-8Wb.G;^mPj-PD[%dN +qMbmGkl6XbccRK8-2NTUhm+Wu2_J@0h`(`N[alGs`kWt:7]6+nf9MsP@"jLu,n-<W6B?X3L+(0u++BV. +,B:(/i(WJ+/e/gWQ'u4lN.=pQ"]i2@L<c'N60(Gne]3ZM#H0sJ;8#m650@0;MZ_2L,V,Tk[8eEZ@LF3N +,Kqb'0^M[17"D#6S',!sok!1M>o`95Lad?6<N0c8ReoE(;n7kPiZs]neqXHB%r4S5!\)W;7&L_m#Qfj0 +nr;05!d9`Wcnb1c)>%.!%]M=K!7g),;7r/pGZK+0=lGn:=Q`p#O5OQ:>%FKROqqeRfgNoGY;R$'Ic<!* +QhO+uo?c#O!h'rV;&:/^n)u,W;EH[oC0q=,p"cHW9"IL8NNugbg>'L;"_%eZ.HSQN(>i4O7eK6MGJ6lG +cu:u?N;#8sYjG%i,H^O><YbcK$c;q[Z$B%-rm)(TB$0B;cRtP4>C#^fG.!EK/*dlgp$dG=dj.1=*#1tG +7lC4:iM=E]J(7cgL>_#aVBH=UDI^:dN@L]L6h#TU1KRr&c^%\Ms'^'sVSipl?6FJP(oAqH1?H\-qtHnV +6ti$iLAqpJc.-%`rb6otn:(i=n:(fVqELZgJ`-P:A_1j'3ZQO.m9I';C&!#fAn*k!T?5E#?9YGKceSpk +*ZV8RH6>&:ga68Qk&/@3AES"!X"*HI0fj7CcA5T8l<E40lH0S+as\X`0aCRZ$XknKOa`=:h/#pu'Re&( +3-TW&01Z/F9\qb`L$fmppbe...@cgno.zbf,4,d1lLJij[]sPXcG5NEVcU,H1\([hPY-18B[/3Z]f3GU +5@/^lFkWlMJN9jcE,+R7nq<:K"!'2SNL4m!n_PC#,<o%.SC?-aC1tc/R`0@B(tGiO?Cm.FpBERRi1NL+ +W71S=Bu(u`n=?/eib=.Q:!43K-S!slgpjYOLmMT0QR5euSdVh_#b4#\9_CNkL8]+Vd#mWJkB#-qBF#7, +m.PM_>$q<qou(+9Q('Ba*cCh#d=k-+ksF[JkX+QoZBh!($i>r"V;1hu?*%%9[,!-7q*4b>6`PU7gheU6 +qhUeG:@BpU7BX&\C\(eHA+a)"V#$EK4P*d3jMspb*q-96?MNHdkE^)GaS*R?D>a`Qmp<0mpo<lg#!UQ8 +4[tEh02T#9s$s`sYPF7SPZYT0kT!jeG2qdggZRs1If2A!`(GNA>8FTMk;)<u=<c)=1iHNYU:IV:+IQ<R +KiYXfYidd"ZJ>5X/82.(UZFsJmX1X1ktgQ,[=1>[7"gX?q7[*,R[C`Xj[2sG3hl2=8J(^db(h<4j<'Wt +9oTT>YC(.#Q&go-q#KO/2$riQBB94Tl)K,=<R%SQ!L\\4Pn2ePcd@@&,q:"h)jJ#c*ud`3K<smDI#sPn +g'1SP]B800ji_=)j<'WtT[qE2LSjJsM;gS`?G1Aj/ejS3NEQ&WP-HJY5ZuFU#M,U"]NC>c=*[pDJ:)1< +7-+OAj.'34K4d2FjT@U5D,icM:r->#&$2i'C*S:Rj[1h2nsrghE)baPHV=)FYDV5+^FF[O8&\91H9EZ+ +ILDeG)#M"eT=?fNotAe+LV?h,*r^Ut"c88sN;!5`(c/JXj>03b7M4[.=-5[-e,0!ar"5da_Pe]aAV>T" +^<5"c)cOC+*,#IX7#cs;rh=&@AjB5+G`Wfrf_n-?/Ff%bf@PM"bGWb0c=?8ArXR"4EUjp=C#.,d`>UE& +8+nU,M'bj\r^nKC4,JX*Zrk*7<BXNmRHlfGd'(^#>KA2(jPb<K$$Cf.IqsI]">[Ml@ecKI!h`g4.nsBc +:]@2A8QbdXX>=muID$B[Ppb-%4u.jE!6LcOErMd2UH2hQZ>[a,mKU3RK$1h$02\icTcm7*8[NjVl8JuX +,ee0F3QQ-25B`ADf<;56#&Mf6m11*R*liGriDg=$#,lJ1#I1_OJ"=gQVTW%j>Q;dhU'ElTJ#EsO&A6lm +o1,'hG\]5ScJZRl2J7OA%HlMS#/CNc8RU8mDr8<6r9<&_0^i@sLf"$R2!YU`n"nkhep,Xc])R!^<S5Hj +rCc66@^/YIrhW)&2S8o\l2HEF`63(P+,GfPgV6]W+^uMS#OMIrjaFKDk5&!tAe`93p,hn9Z,WX<d/CH7 +AGWnBhERDL:uKYpj8Qle9PTCX:SrnaG#<Ma+<R>g*&g@,?u&!QIrPF:rr;sFb<Ob]Ig:7?"KM~> +endstream +endobj +7 0 obj + 4449 +endobj +3 0 obj + << + /Parent null + /Type /Pages + /MediaBox [0.0000 0.0000 307.00 279.00] + /Resources 8 0 R + /Kids [5 0 R] + /Count 1 + >> +endobj +9 0 obj + [/PDF /Text /ImageC] +endobj +10 0 obj + << + /S /Transparency + /CS /DeviceRGB + /I true + /K false + >> +endobj +11 0 obj + << + /Alpha1 + << + /ca 1.0000 + /CA 1.0000 + /BM /Normal + /AIS false + >> + >> +endobj +8 0 obj + << + /ProcSet 9 0 R + /ExtGState 11 0 R + >> +endobj +xref +0 12 +0000000000 65535 f +0000000015 00000 n +0000000315 00000 n +0000005191 00000 n +0000000445 00000 n +0000000521 00000 n +0000000609 00000 n +0000005168 00000 n +0000005645 00000 n +0000005361 00000 n +0000005400 00000 n +0000005502 00000 n +trailer +<< + /Size 12 + /Root 2 0 R + /Info 1 0 R +>> +startxref +5718 +%%EOF http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/doc/design/modules/graph.tex ---------------------------------------------------------------------- diff --git a/doc/design/modules/graph.tex b/doc/design/modules/graph.tex index ec46842..f9183b3 100644 --- a/doc/design/modules/graph.tex +++ b/doc/design/modules/graph.tex @@ -29,6 +29,7 @@ \item[v0.2] Graph Framework, SSSP implementation details. \item[v0.3] PageRank \item[v0.4] APSP + \item[v0.5] Weakly Connected Components \end{modulehistory} \end{moduleinfo} @@ -505,3 +506,113 @@ a high $threshold$ value would lead to early termination of PageRank computation, thus resulting in incorrect PageRank values. +\section{Weakly Connected Components} \label{sec:graph:wcc} +\begin{figure}[h] + \centering + \includegraphics[width=0.5\textwidth]{figures/wcc_example.pdf} +\caption{An example disconnected directed graph} +\label{wcc:example} +\end{figure} + +Given a directed graph $G$, a weakly connected component is a subgraph +$G_{sub}$ of $G$, such that there exists a path from every vertex in $G_{sub}$ +to every other vertex in $G_{sub}$, ignoring the direction of the edges. + +The weakly connected component module implemented in MADlib is based on +GRAIL~\cite{grail}. All vertices are initialized with their own vertex +ID as the component ID, and are considered to be active. In every iteration, +each active vertex's component ID is updated with the smallest component ID +value of all its neighbors. Any vertex whose component ID is not updated in +the current iteration is deemed as an inactive vertex for the next iteration. +Execution continues until there are no active vertices left. Since each vertex +is initialized with its own ID as the component ID, and updated based on +neighboring nodes' component IDs, the final component ID of a component will +be equal to the smallest vertex ID in the corresponding subgraph. +Figure~\ref{wcc:example} shows an example directed graph with two disconnected +subgraphs. The subgraph containing vertices $1$, $2$, $3$, $4$, $5$ and $6$ +forms a weakly connected component, and is assigned component ID 1, while the +subgraph containing vertices $12$, $14$, $21$ and $23$ forms the second component +and is assigned component ID 12. + +\subsection{Implementation Details} \label{sec:wcc:implementation} + +In this section, we discuss the MADlib implementation of weakly connected +components in depth. We maintain the following tables at every iteration: +$oldupdate$, $message$ and $newupdate$. In $newupdate$, the component ID +of each vertex is initialized to infinity, while the component ID of vertices +in the $message$ table is initialized to their corresponding vertex ID. + +\begin{algorithm}[Weakly Connected Components$(V,E)$] \label{alg:wcc:high} +\begin{algorithmic}[1] + \State Create $newupdate$ table with a default component ID of + $infinity$ for every vertex + \State Create $message$ table with a default component ID of the + corresponding $id$ (vertex ID) for every vertex + \Repeat + \State Update the $oldupdate$ table + \State Update $toupdate$ table with active vertices + \State Update the $newupdate$ table + \State Update $message$ table with potential new component IDs for each vertex + \Until {There are no active vertices in $toupdate$ table} +\end{algorithmic} +\end{algorithm} + +The $message$ table contains the component IDs associated with all its +immediate neighbors. At each iteration, $oldupdate$ is updated with the +minimum of all the associated component IDs found for a vertex in $message$. + +\begin{algorithm}[Update oldupdate table] +\begin{lstlisting} +SELECT id, MIN(message.component_id) as component_id +FROM message +GROUP BY id +\end{lstlisting} +\end{algorithm} + +Table $toupdate$ records all vertices whose component IDs must be updated, +and are thus marked active. + +\begin{algorithm}[Update toupdate table with active vertices] +\begin{lstlisting} +-- Find vertices whose component ID must be updated +CREATE TABLE toupdate AS +SELECT id, component_id +FROM oldupdate, newupdate +WHERE oldupdate.id = newupdate.id AND + oldupdate.component_id < newupdate.component_id + +-- Update the component IDs +UPDATE newupdate SET +component_id = toupdate.component_id +FROM toupdate +WHERE newupdate.id = toupdate.id +\end{lstlisting} +\end{algorithm} + +Finally, the $message$ table is updated with potential new +component IDs for active vertices using the following query: + +\begin{algorithm}[Update oldupdate table] +\begin{lstlisting} +CREATE TEMP TABLE message AS +SELECT id, MIN(component_id) AS component_id +FROM ( + SELECT edge.src AS id, + toupdate.component_id + FROM toupdate, edge + WHERE edge.dest = toupdate.id + UNION ALL + SELECT edge.dest AS id, + toupdate.component_id + FROM toupdate, edge + WHERE edge.src = toupdate.id +) AS t +GROUP BY id +\end{lstlisting} +\end{algorithm} + +At the end of the computation, $newupdate$ will have the component ID +associated with each vertex in $G$. The component ID of all the vertices +in a component is equal to the smallest vertex ID in the corresponding +subgraph. + http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/doc/literature.bib ---------------------------------------------------------------------- diff --git a/doc/literature.bib b/doc/literature.bib index d6941c4..08fb2dd 100644 --- a/doc/literature.bib +++ b/doc/literature.bib @@ -921,3 +921,13 @@ Applied Survival Analysis}, howpublished = {\url{http://users.cecs.anu.edu.au/~Alistair.Rendell/Teaching/apac_comp3600/module4/all_pairs_shortest_paths.xhtml}}, note = {Accessed: 2017-06-07} } + +@inproceedings{grail, + author = {Jing Fan and + Adalbert Gerald Soosai Raj and + Jignesh M. Patel}, + title = {The Case Against Specialized Graph Analytics Engines}, + booktitle = {{CIDR} 2015, Seventh Biennial Conference on Innovative Data Systems + Research, Asilomar, CA, USA, January 4-7, 2015, Online Proceedings}, + year = {2015} +} http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/doc/mainpage.dox.in ---------------------------------------------------------------------- diff --git a/doc/mainpage.dox.in b/doc/mainpage.dox.in index 9922ed2..e16f9b2 100644 --- a/doc/mainpage.dox.in +++ b/doc/mainpage.dox.in @@ -135,6 +135,9 @@ complete matrix stored as a distributed table. @defgroup grp_sssp Single Source Shortest Path @ingroup grp_graph + @defgroup grp_wcc Weakly Connected Components + @ingroup grp_graph + @defgroup grp_mdl Model Evaluation @{Contains functions for evaluating accuracy and validation of predictive methods. @} @defgroup grp_validation Cross Validation http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/src/ports/postgres/modules/graph/test/wcc.sql_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/test/wcc.sql_in b/src/ports/postgres/modules/graph/test/wcc.sql_in index f9430f1..3751eb0 100644 --- a/src/ports/postgres/modules/graph/test/wcc.sql_in +++ b/src/ports/postgres/modules/graph/test/wcc.sql_in @@ -54,6 +54,38 @@ INSERT INTO edge VALUES (3, 0, 1), (5, 6, 1), (6, 3, 1), +(10, 11, 1), +(10, 12, 1), +(11, 12, 1), +(11, 13, 1), +(12, 13, 1), +(13, 10, 1), +(15, 16, 1), +(15, 14, 1); + +DROP TABLE IF EXISTS wcc_out; +SELECT weakly_connected_components( + 'vertex', + 'vertex_id', + 'edge', + 'src=src_node,dest=dest_node', + 'wcc_out'); + +SELECT assert(relative_error(count(distinct component_id), 4) < 0.00001, + 'Weakly Connected Components: Number of components found is not 4.' + ) FROM wcc_out; + +INSERT INTO edge VALUES +(0, 1, 2), +(0, 2, 2), +(1, 2, 2), +(1, 3, 2), +(2, 3, 2), +(2, 5, 2), +(2, 6, 2), +(3, 0, 2), +(5, 6, 2), +(6, 3, 2), (10, 11, 2), (10, 12, 2), (11, 12, 2), @@ -69,8 +101,16 @@ SELECT weakly_connected_components( 'vertex_id', 'edge', 'src=src_node,dest=dest_node', - 'wcc_out'); + 'wcc_out', + 'user_id'); +-- NOTE: The disconnected vertex '4' is not seen as a separate component +-- in either group. This way of handling disconnected nodes is consistent +-- with other graph modules that support grouping. At the moment (6/30/17), +-- we have no way of including disconnected nodes inside a group. +SELECT assert(relative_error(count(distinct component_id), 3) < 0.00001, + 'Weakly Connected Components: Number of components found is not 4.' + ) FROM wcc_out WHERE user_id=2; -SELECT assert(relative_error(count(distinct component_id), 4) < 0.00001, +SELECT assert(relative_error(count(distinct component_id), 3) < 0.00001, 'Weakly Connected Components: Number of components found is not 4.' - ) FROM wcc_out; + ) FROM wcc_out WHERE user_id=1; http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/src/ports/postgres/modules/graph/wcc.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/wcc.py_in b/src/ports/postgres/modules/graph/wcc.py_in index d07ac05..02cceeb 100644 --- a/src/ports/postgres/modules/graph/wcc.py_in +++ b/src/ports/postgres/modules/graph/wcc.py_in @@ -28,7 +28,6 @@ """ import plpy -from utilities.control import MinWarning from utilities.utilities import _assert from utilities.utilities import extract_keyvalue_params from utilities.utilities import unique_string, split_quoted_delimited_str @@ -37,7 +36,6 @@ from graph_utils import * m4_changequote(`<!', `!>') - def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table, edge_params, out_table, grouping_cols_list, module_name): """ @@ -50,8 +48,14 @@ def validate_wcc_args(schema_madlib, vertex_table, vertex_id, edge_table, # to be column names in the edge_table, and not expressions! _assert(columns_exist_in_table(edge_table, grouping_cols_list, schema_madlib), "Weakly Connected Components error: One or more grouping columns specified do not exist!") - with MinWarning("warning"): - plpy.warning("Grouping is not currently supported at the moment.") + + +def prefix_tablename_to_colnames(table, cols_list): + return ' , '.join(["{0}.{1}".format(table, col) for col in cols_list]) + +def get_where_condition(table1, table2, cols_list): + return ' AND '.join(['{0}.{2}={1}.{2}'.format(table1, table2, col) + for col in cols_list]) def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, out_table, grouping_cols, **kwargs): @@ -97,17 +101,82 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, <!"DISTRIBUTED BY ({0})".format(vertex_id)!>) - + subq_prefixed_grouping_cols = '' + comma_toupdate_prefixed_grouping_cols = '' + comma_oldupdate_prefixed_grouping_cols = '' + old_new_update_where_condition = '' + new_to_update_where_condition = '' + edge_to_update_where_condition = '' is_hawq = m4_ifdef(<!__HAWQ__!>, <!True!>, <!False!>) INT_MAX = 2147483647 component_id = 'component_id' - plpy.execute(""" - CREATE TABLE {newupdate} AS - SELECT {vertex_id}, CAST({INT_MAX} AS INT) AS {component_id} - FROM {vertex_table} - {distribution} - """.format(**locals())) + if grouping_cols: + distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>, + <!"DISTRIBUTED BY ({0},{1})".format(grouping_cols, vertex_id)!>) + # Update some variables useful for grouping based query strings + subq = unique_string(desp='subquery') + distinct_grp_table = unique_string(desp='grptable') + plpy.execute(""" + CREATE TABLE {distinct_grp_table} AS + SELECT DISTINCT {grouping_cols} FROM {edge_table} + """.format(**locals())) + comma_toupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames(toupdate, + grouping_cols_list) + comma_oldupdate_prefixed_grouping_cols = ', ' + prefix_tablename_to_colnames( + oldupdate, grouping_cols_list) + subq_prefixed_grouping_cols = prefix_tablename_to_colnames(subq, + grouping_cols_list) + old_new_update_where_condition = ' AND ' + get_where_condition( + oldupdate, newupdate, grouping_cols_list) + new_to_update_where_condition = ' AND ' + get_where_condition( + newupdate, toupdate, grouping_cols_list) + edge_to_update_where_condition = ' AND ' + get_where_condition( + edge_table, toupdate, grouping_cols_list) + plpy.execute(""" + CREATE TABLE {newupdate} AS + SELECT {subq}.{vertex_id}, + CAST({INT_MAX} AS INT) AS {component_id} + {select_grouping_cols} + FROM {distinct_grp_table} INNER JOIN ( + SELECT {select_grouping_cols_clause} {src} AS {vertex_id} + FROM {edge_table} + UNION + SELECT {select_grouping_cols_clause} {dest} AS {vertex_id} + FROM {edge_table} + ) {subq} + ON {join_grouping_cols} + GROUP BY {group_by_clause} + {distribution} + """.format(select_grouping_cols=','+subq_prefixed_grouping_cols, + join_grouping_cols=get_where_condition(subq, + distinct_grp_table, grouping_cols_list), + group_by_clause='' if not grouping_cols else + subq_prefixed_grouping_cols+', {0}.{1}'.format(subq, vertex_id), + select_grouping_cols_clause='' if not grouping_cols else + grouping_cols+', ', **locals())) + plpy.execute(""" + CREATE TEMP TABLE {message} AS + SELECT {vertex_id}, + CAST({vertex_id} AS INT) AS {component_id} + {select_grouping_cols_clause} + FROM {newupdate} + {distribution} + """.format(select_grouping_cols_clause='' if not grouping_cols else + ', '+grouping_cols, **locals())) + else: + plpy.execute(""" + CREATE TABLE {newupdate} AS + SELECT {vertex_id}, CAST({INT_MAX} AS INT) AS {component_id} + FROM {vertex_table} + {distribution} + """.format(**locals())) + plpy.execute(""" + CREATE TEMP TABLE {message} AS + SELECT {vertex_id}, CAST({vertex_id} AS INT) AS {component_id} + FROM {vertex_table} + {distribution} + """.format(**locals())) if is_hawq: plpy.execute(""" CREATE TABLE {temp_out_table} AS @@ -115,12 +184,6 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, LIMIT 0 {distribution} """.format(**locals())) - plpy.execute(""" - CREATE TEMP TABLE {message} AS - SELECT {vertex_id}, CAST({vertex_id} AS INT) AS {component_id} - FROM {vertex_table} - {distribution} - """.format(**locals())) nodes_to_update = 1 while nodes_to_update > 0: # This idea here is simple. Look at all the neighbors of a node, and @@ -135,18 +198,25 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, CREATE TEMP TABLE {oldupdate} AS SELECT {message}.{vertex_id}, MIN({message}.{component_id}) AS {component_id} + {grouping_cols_select} FROM {message} - GROUP BY {vertex_id} + GROUP BY {group_by_clause} {vertex_id} {distribution} - """.format(**locals())) + """.format(grouping_cols_select='' if not grouping_cols else + ', {0}'.format(grouping_cols), group_by_clause='' + if not grouping_cols else '{0}, '.format(grouping_cols), + **locals())) plpy.execute("DROP TABLE IF EXISTS {0}".format(toupdate)) plpy.execute(""" CREATE TEMP TABLE {toupdate} AS - SELECT {oldupdate}.{vertex_id}, {oldupdate}.{component_id} + SELECT {oldupdate}.{vertex_id}, + {oldupdate}.{component_id} + {comma_oldupdate_prefixed_grouping_cols} FROM {oldupdate}, {newupdate} WHERE {oldupdate}.{vertex_id}={newupdate}.{vertex_id} AND {oldupdate}.{component_id}<{newupdate}.{component_id} + {old_new_update_where_condition} {distribution} """.format(**locals())) @@ -160,6 +230,7 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, SELECT * FROM {toupdate} WHERE {newupdate}.{vertex_id}={toupdate}.{vertex_id} + {new_to_update_where_condition} ) UNION SELECT * FROM {toupdate}; @@ -179,28 +250,49 @@ def wcc(schema_madlib, vertex_table, vertex_id, edge_table, edge_args, {component_id}={toupdate}.{component_id} FROM {toupdate} WHERE {newupdate}.{vertex_id}={toupdate}.{vertex_id} + {new_to_update_where_condition} """.format(**locals())) plpy.execute("DROP TABLE IF EXISTS {0}".format(message)) plpy.execute(""" CREATE TEMP TABLE {message} AS SELECT {vertex_id}, MIN({component_id}) AS {component_id} + {select_grouping_cols} FROM ( - SELECT {edge_table}.{src} AS {vertex_id}, {toupdate}.{component_id} + SELECT {edge_table}.{src} AS {vertex_id}, + {toupdate}.{component_id} + {comma_toupdate_prefixed_grouping_cols} FROM {toupdate}, {edge_table} WHERE {edge_table}.{dest} = {toupdate}.{vertex_id} + {edge_to_update_where_condition} UNION ALL - SELECT {edge_table}.{dest} AS {vertex_id}, {toupdate}.{component_id} + SELECT {edge_table}.{dest} AS {vertex_id}, + {toupdate}.{component_id} + {comma_toupdate_prefixed_grouping_cols} FROM {toupdate}, {edge_table} WHERE {edge_table}.{src} = {toupdate}.{vertex_id} + {edge_to_update_where_condition} ) AS t - GROUP BY {vertex_id} - """.format(**locals())) + GROUP BY {group_by_clause} {vertex_id} + """.format(select_grouping_cols='' if not grouping_cols + else ', {0}'.format(grouping_cols), group_by_clause='' + if not grouping_cols else ' {0}, '.format(grouping_cols), + **locals())) plpy.execute("DROP TABLE {0}".format(oldupdate)) - nodes_to_update = plpy.execute(""" - SELECT COUNT(*) AS cnt FROM {toupdate} - """.format(**locals()))[0]["cnt"] + if grouping_cols: + nodes_to_update = plpy.execute(""" + SELECT SUM(cnt) AS cnt_sum + FROM ( + SELECT COUNT(*) AS cnt + FROM {toupdate} + GROUP BY {grouping_cols} + ) t + """.format(**locals()))[0]["cnt_sum"] + else: + nodes_to_update = plpy.execute(""" + SELECT COUNT(*) AS cnt FROM {toupdate} + """.format(**locals()))[0]["cnt"] plpy.execute("ALTER TABLE {0} RENAME TO {1}".format(newupdate, out_table)) plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3} http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/c82c0f38/src/ports/postgres/modules/graph/wcc.sql_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/graph/wcc.sql_in b/src/ports/postgres/modules/graph/wcc.sql_in index af20281..a02db55 100644 --- a/src/ports/postgres/modules/graph/wcc.sql_in +++ b/src/ports/postgres/modules/graph/wcc.sql_in @@ -35,9 +35,7 @@ m4_include(`SQLCommon.m4') <div class="toc"><b>Contents</b> <ul> <li><a href="#wcc">Weakly Connected Components</a></li> -<li><a href="#notes">Notes</a></li> <li><a href="#examples">Examples</a></li> -<li><a href="#literature">Literature</a></li> </ul> </div> @@ -89,24 +87,23 @@ this string argument: It will contain a row for every vertex from 'vertex_table' with the following columns: - vertex_id : The id of a vertex. Will use the input parameter 'vertex_id' for column naming. - - component_id : The vertex's component. + - component_id : Component that the vertex belongs to. + We use the convention where 'component_id' is the id of + the first vertex in a particular group. It means that component ids + are generally not contiguous. - grouping_cols : Grouping column (if any) values associated with the vertex_id.</dd> <dt>grouping_cols (optional)</dt> <dd>TEXT, default: NULL. A single column or a list of comma-separated -columns that divides the input data into discrete groups, resulting in one -distribution per group. When this value is NULL, no grouping is used and -a single model is generated for all data. -@note Grouping is not currently supported at the moment.</dd> +columns that divides the input data into discrete groups, which are +treated independently as separate graphs. +When this value is NULL, no grouping is used and +weakly connected components are generated for all data +(single graph). +@note Expressions are not currently supported for 'grouping_cols'.</dd> </dl> -@anchor notes -@par Notes - -See the Grail project [1] for more background on graph analytics processing -in relational databases. - @anchor examples @examp @@ -191,7 +188,7 @@ SELECT * FROM wcc_out ORDER BY component_id, id; -# Now all the weakly connected components associated with each user using the grouping feature: <pre class="syntax"> -DROP TABLE IF EXISTS wcc_out, wcc_out_summary; +DROP TABLE IF EXISTS wcc_out; SELECT madlib.weakly_connected_components( 'vertex', -- Vertex table 'id', -- Vertix id column @@ -202,16 +199,27 @@ SELECT madlib.weakly_connected_components( SELECT * FROM wcc_out ORDER BY user_id, component_id, id; </pre> <pre class="result"> - user_id | id | component_id ----------+----+-------------------- - + id | component_id | user_id +----+--------------+--------- + 0 | 0 | 1 + 1 | 0 | 1 + 2 | 0 | 1 + 3 | 0 | 1 + 5 | 0 | 1 + 6 | 0 | 1 + 10 | 10 | 2 + 11 | 10 | 2 + 12 | 10 | 2 + 13 | 10 | 2 + 14 | 14 | 2 + 15 | 14 | 2 + 16 | 14 | 2 +(13 rows) </pre> +Note that vertex '4' is not identified as a separate component +in the above result. This is because disconnected nodes cannot be assigned to +a particular group with the current graph representation in MADlib. -@anchor literature -@par Literature - -[1] The case against specialized graph analytics engines, J. Fan, G. Soosai Raj, -and J. M. Patel. CIDR 2015. http://cidrdb.org/cidr2015/Papers/CIDR15_Paper20.pdf */ -------------------------------------------------------------------------