The attached patch is proof of the concept.

It can be applied on the latest CVS HEAD with colprivs_2009011001.diff.gz.
- It unpatches unnecessary updates at parser/parse_expr.c, parser/parse_node.c
  and parser/parse_relation.c.
- It adds markColumnForSelectPriv() to mark proper rte->cols_sel for
  the given Var node. It is invoked from scanRTEForColumn(), expandRelAttrs()
  and transformWholeRowRef().
- The markColumnForSelectPriv() uses walker function internally, because
  there is no guarantee all the entities within RangeTblEntry->joinaliasvars
  are Var type node. However, it is used to walks on a single Var node, not
  whole of Query tree, so I think its cost is small enough.

==== Results:
postgres=# CREATE TABLE t1 (a int, b text, c bool);
CREATE TABLE
postgres=# CREATE TABLE t2 (x int, y text, z bool);
CREATE TABLE
postgres=# GRANT SELECT (a,b) ON t1 TO ymj;
GRANT
postgres=# GRANT SELECT (x,y) ON t2 TO ymj;
GRANT
postgres=# \c - ymj
psql (8.4devel)
You are now connected to database "postgres" as user "ymj".
postgres=> SELECT a,b FROM t1;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.b required: 0002 allowed: 0002
 a | b
---+---
(0 rows)

postgres=> SELECT * FROM t1;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.b required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.c required: 0002 allowed: 0000
ERROR:  permission denied for relation t1

postgres=> SELECT a FROM t1 order by c;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.c required: 0002 allowed: 0000
ERROR:  permission denied for relation t1

postgres=> SELECT t1 FROM t1;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.b required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.c required: 0002 allowed: 0000
ERROR:  permission denied for relation t1

postgres=> SELECT 'abcd' FROM t1;
DEBUG:  pg_attribute_aclmask: t1.tableoid required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.cmax required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.xmax required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.cmin required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.xmin required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.ctid required: 0002 allowed: 0000
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
 ?column?
----------
(0 rows)

postgres=> SELECT b,y FROM t1 JOIN t2 ON a=x;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.b required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t2.x required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t2.y required: 0002 allowed: 0002
 b | y
---+---
(0 rows)

postgres=> SELECT b, (SELECT y FROM t2 WHERE x=a) FROM t1;
DEBUG:  pg_attribute_aclmask: t1.a required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t1.b required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t2.x required: 0002 allowed: 0002
DEBUG:  pg_attribute_aclmask: t2.y required: 0002 allowed: 0002
 b | ?column?
---+----------
(0 rows)

Thanks,

KaiGai Kohei wrote:
> Tom Lane wrote:
>> I'm thinking make_var is not the place to do this.  The places that are
>> supposed to be taking care of permissions are the ones that do this:
>>
>>              /* Require read access --- see comments in setTargetTable() */
>>              rte->requiredPerms |= ACL_SELECT;
> 
> Indeed, it seems to me appropriate policy, because CLP feature is
> a part of SQL standard access control model.
> 
> 
> The rte->requiredPerms is set on the following places:
> 
> (1) transformLockingClause() and markQueryForLocking()
> It adds ACL_SELECT_FOR_UPDATE for listed relations.
> In this case, column-level priv feature checks ACL_UPDATE for each
> columns on rte->cols_sel, doesn't it?
> So, I think it is unnecessary to care about anything here.
> 
> (2) setTargetTable()
> It is invoked from transformInsertStmt(), transformUpdateStmt() and
> transformDeleteStmt(). I thnk it is a proper place to set rte->cols_mod,
> but the caller does not initialize Query->targetList yet, when it is
> invoked.
> So, I think we need put a function call to set cols_mod on caller side
> based on Query->resultRelation and Query->targetList.
> 
> (3) scanRTEForColumn()
> Stephen's original one patched here, but it does not handle RTE_JOIN
> well, so it made a matter on table-joins.
> Please revert a code to mark rte->cols_sel, with proper handling for
> RTE_JOIN cases.
> 
> (4) addRangeTableEntry()
> Purpose of the function is to translate RangeVar node to RangeTblEntry
> node listed on FROM clause and so on.
> I think we don't need to add any column references here.
> When the translated relation used for column-references, it is a case
> that rte->cols_sel is empty.
> 
> (5) addRangeTableEntryForRelation()
> It is similar to addRangeTableEntry().
> I think we don't need to add something here.
> 
> (6) ExpandColumnRefStar() and ExpandAllTables()
> They invoke expandRelAttrs() to handle "SELECT * FROM t;" case.
> I think here is a proper point to mark refered columns.
> Please note that here is no guarantee that given RangeTblEntry has
> RTE_RELATION.
> 
> Thus, we need the following reworking in my opinion.
> (a) Implement a function to set rte->cols_sel and rte->cols_mod correctly,
>     even if the given rte has RTE_JOIN.
> (b) Put a function to set rte->cols_mod on the caller of setTargetTable().
> (c) Put a function to set rte->cols_sel on scanRTEForColumn(), 
> expandRelAttrs()
>     and transformWholeRowRef().
> 
>> It's possible that we've missed some --- in particular, right at the
>> moment I am not sure that whole-row Vars are handled properly.
> 
> Is transformWholeRowRef() a proper place to handle it?
> If given RangeTblEntry has RTE_JOIN, it has to resolve it and set
> its source's cols_sel.
> 
>> And maybe we could refactor a little bit to save some code.
>> But those are basically the same places that ought to be adding
>> bits to the column bitmaps.
> 


-- 
OSS Platform Development Division, NEC
KaiGai Kohei <kai...@ak.jp.nec.com>
diff -cr a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
*** a/src/backend/parser/parse_expr.c	2009-01-13 15:54:53.000000000 +0900
--- b/src/backend/parser/parse_expr.c	2009-01-13 16:31:39.000000000 +0900
***************
*** 15,21 ****
  
  #include "postgres.h"
  
- #include "access/sysattr.h"
  #include "catalog/pg_type.h"
  #include "commands/dbcommands.h"
  #include "miscadmin.h"
--- 15,20 ----
***************
*** 1923,1937 ****
  							 toid,
  							 -1,
  							 sublevels_up);
- 
- 			/* Add this full-row reference to the RTEs cols_sel set to handle
- 			 * column-level permissions.  Note that we use a bitmapset here
- 			 * and it can only handle non-negative values, so we need to
- 			 * offset by FirstLowInvalidHeapAttributeNumber.  This will be
- 			 * undone in execMain. */
- 			rte->cols_sel = bms_add_member(rte->cols_sel,
- 										InvalidAttrNumber -
- 										FirstLowInvalidHeapAttributeNumber);
  			break;
  		case RTE_FUNCTION:
  			toid = exprType(rte->funcexpr);
--- 1922,1927 ----
***************
*** 1987,1992 ****
--- 1977,1984 ----
  	/* location is not filled in by makeVar */
  	result->location = location;
  
+ 	markColumnForSelectPriv((Node *) result, pstate);
+ 
  	return (Node *) result;
  }
  
diff -cr a/src/backend/parser/parse_node.c b/src/backend/parser/parse_node.c
*** a/src/backend/parser/parse_node.c	2009-01-13 15:54:53.000000000 +0900
--- b/src/backend/parser/parse_node.c	2009-01-13 16:31:39.000000000 +0900
***************
*** 15,21 ****
  #include "postgres.h"
  
  #include "access/heapam.h"
- #include "access/sysattr.h"
  #include "catalog/pg_type.h"
  #include "mb/pg_wchar.h"
  #include "nodes/makefuncs.h"
--- 15,20 ----
***************
*** 189,203 ****
  	get_rte_attribute_type(rte, attrno, &vartypeid, &type_mod);
  	result = makeVar(vnum, attrno, vartypeid, type_mod, sublevels_up);
  	result->location = location;
- 
- 	/* This is also a convenient place to add this column to the cols_sel
- 	 * of the RTE.  This is needed to handle column-level permissions.
- 	 * Note that we use a bitmapset here which can only handle non-negative
- 	 * values, so we have to offset it by FirstLowInvalidHeapAttributeNumber.
- 	 */
- 	rte->cols_sel = bms_add_member(rte->cols_sel, attrno -
- 									FirstLowInvalidHeapAttributeNumber);
- 
  	return result;
  }
  
--- 188,193 ----
diff -cr a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
*** a/src/backend/parser/parse_relation.c	2009-01-13 15:54:53.000000000 +0900
--- b/src/backend/parser/parse_relation.c	2009-01-13 16:31:39.000000000 +0900
***************
*** 40,50 ****
  static RangeTblEntry *scanNameSpaceForRelid(ParseState *pstate, Oid relid,
  											int location);
  static bool isLockedRel(ParseState *pstate, char *refname);
! static void expandRelation(RangeTblEntry *rte,
  			   int rtindex, int sublevels_up,
  			   int location, bool include_dropped,
  			   List **colnames, List **colvars);
! static void expandTupleDesc(TupleDesc tupdesc, RangeTblEntry *rte,
  				int rtindex, int sublevels_up,
  				int location, bool include_dropped,
  				List **colnames, List **colvars);
--- 40,50 ----
  static RangeTblEntry *scanNameSpaceForRelid(ParseState *pstate, Oid relid,
  											int location);
  static bool isLockedRel(ParseState *pstate, char *refname);
! static void expandRelation(Oid relid, Alias *eref,
  			   int rtindex, int sublevels_up,
  			   int location, bool include_dropped,
  			   List **colnames, List **colvars);
! static void expandTupleDesc(TupleDesc tupdesc, Alias *eref,
  				int rtindex, int sublevels_up,
  				int location, bool include_dropped,
  				List **colnames, List **colvars);
***************
*** 430,435 ****
--- 430,470 ----
  }
  
  /*
+  * markColumnForSelectPriv
+  *     It marks rte->cols_sel for given Var node
+  */
+ bool
+ markColumnForSelectPriv(Node *node, ParseState *pstate)
+ {
+ 	if (IsA(node, Var))
+ 	{
+ 		RangeTblEntry *rte;
+ 		ParseState *cur = pstate;
+ 		Var *v = (Var *) node;
+ 		int lv;
+ 
+ 		for (lv = v->varlevelsup; lv > 0; lv--)
+ 			cur = cur->parentParseState;
+ 
+ 		rte = rt_fetch(v->varno, cur->p_rtable);
+ 		Assert(IsA(rte, RangeTblEntry));
+ 
+ 		if (rte->rtekind == RTE_RELATION)
+ 		{
+ 			rte->cols_sel = bms_add_member(rte->cols_sel, v->varattno
+ 									- FirstLowInvalidHeapAttributeNumber);
+ 		}
+ 		else if (rte->rtekind == RTE_JOIN)
+ 		{
+ 			Node *subnode = list_nth(rte->joinaliasvars, v->varattno - 1);
+ 
+ 			markColumnForSelectPriv(subnode, cur);
+ 		}
+ 	}
+ 	return expression_tree_walker(node, markColumnForSelectPriv, (void *) pstate);
+ }
+ 
+ /*
   * scanRTEForColumn
   *	  Search the column names of a single RTE for the given name.
   *	  If found, return an appropriate Var node, else return NULL.
***************
*** 479,484 ****
--- 514,520 ----
  			result = (Node *) make_var(pstate, rte, attnum, location);
  			/* Require read access */
  			rte->requiredPerms |= ACL_SELECT;
+ 			markColumnForSelectPriv(result, pstate);
  		}
  	}
  
***************
*** 1480,1486 ****
  	{
  		case RTE_RELATION:
  			/* Ordinary relation RTE */
! 			expandRelation(rte,
  						   rtindex, sublevels_up, location,
  						   include_dropped, colnames, colvars);
  			break;
--- 1516,1522 ----
  	{
  		case RTE_RELATION:
  			/* Ordinary relation RTE */
! 			expandRelation(rte->relid, rte->eref,
  						   rtindex, sublevels_up, location,
  						   include_dropped, colnames, colvars);
  			break;
***************
*** 1538,1544 ****
  				{
  					/* Composite data type, e.g. a table's row type */
  					Assert(tupdesc);
! 					expandTupleDesc(tupdesc, rte,
  									rtindex, sublevels_up, location,
  									include_dropped, colnames, colvars);
  				}
--- 1574,1580 ----
  				{
  					/* Composite data type, e.g. a table's row type */
  					Assert(tupdesc);
! 					expandTupleDesc(tupdesc, rte->eref,
  									rtindex, sublevels_up, location,
  									include_dropped, colnames, colvars);
  				}
***************
*** 1735,1749 ****
   * expandRelation -- expandRTE subroutine
   */
  static void
! expandRelation(RangeTblEntry *rte, int rtindex, int sublevels_up,
  			   int location, bool include_dropped,
  			   List **colnames, List **colvars)
  {
  	Relation	rel;
  
  	/* Get the tupledesc and turn it over to expandTupleDesc */
! 	rel = relation_open(rte->relid, AccessShareLock);
! 	expandTupleDesc(rel->rd_att, rte, rtindex, sublevels_up,
  					location, include_dropped,
  					colnames, colvars);
  	relation_close(rel, AccessShareLock);
--- 1771,1785 ----
   * expandRelation -- expandRTE subroutine
   */
  static void
! expandRelation(Oid relid, Alias *eref, int rtindex, int sublevels_up,
  			   int location, bool include_dropped,
  			   List **colnames, List **colvars)
  {
  	Relation	rel;
  
  	/* Get the tupledesc and turn it over to expandTupleDesc */
! 	rel = relation_open(relid, AccessShareLock);
! 	expandTupleDesc(rel->rd_att, eref, rtindex, sublevels_up,
  					location, include_dropped,
  					colnames, colvars);
  	relation_close(rel, AccessShareLock);
***************
*** 1753,1764 ****
   * expandTupleDesc -- expandRTE subroutine
   */
  static void
! expandTupleDesc(TupleDesc tupdesc, RangeTblEntry *rte,
  				int rtindex, int sublevels_up,
  				int location, bool include_dropped,
  				List **colnames, List **colvars)
  {
- 	Alias		*eref = rte->eref;
  	int			maxattrs = tupdesc->natts;
  	int			numaliases = list_length(eref->colnames);
  	int			varattno;
--- 1789,1799 ----
   * expandTupleDesc -- expandRTE subroutine
   */
  static void
! expandTupleDesc(TupleDesc tupdesc, Alias *eref,
  				int rtindex, int sublevels_up,
  				int location, bool include_dropped,
  				List **colnames, List **colvars)
  {
  	int			maxattrs = tupdesc->natts;
  	int			numaliases = list_length(eref->colnames);
  	int			varattno;
***************
*** 1784,1802 ****
  			}
  			continue;
  		}
- 		else
- 		{
- 			if (rte->rtekind == RTE_RELATION)
- 			{
- 				/* Add these columns to the RTEs cols_sel bitmapset.  As a
- 				 * bitmapset can only handle non-negative values, we have to
- 				 * offset it by FirstLowInvalidHeapAttributeNumber. 
- 				 * This will be undone in execMain. */
- 				rte->cols_sel = bms_add_member(rte->cols_sel,
- 						varattno+1 -
- 						FirstLowInvalidHeapAttributeNumber);
- 			}
- 		}
  
  		if (colnames)
  		{
--- 1819,1824 ----
***************
*** 1856,1861 ****
--- 1878,1885 ----
  							 label,
  							 false);
  		te_list = lappend(te_list, te);
+ 
+ 		markColumnForSelectPriv(varnode, pstate);
  	}
  
  	Assert(name == NULL && var == NULL);	/* lists not the same length? */
diff -cr a/src/include/parser/parse_relation.h b/src/include/parser/parse_relation.h
*** a/src/include/parser/parse_relation.h	2009-01-13 15:54:53.000000000 +0900
--- b/src/include/parser/parse_relation.h	2009-01-13 16:31:39.000000000 +0900
***************
*** 36,41 ****
--- 36,43 ----
  					   int sublevels_up);
  extern CommonTableExpr *GetCTEForRTE(ParseState *pstate, RangeTblEntry *rte,
  									 int rtelevelsup);
+ extern bool markColumnForSelectPriv(Node *node, ParseState *pstate);
+ 
  extern Node *scanRTEForColumn(ParseState *pstate, RangeTblEntry *rte,
  				 char *colname, int location);
  extern Node *colNameToVar(ParseState *pstate, char *colname, bool localonly,
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to