Hi,
As far as I can tell you have not *AT ALL* addressed that it is *NOT
SAFE* to evaluate arbitrary expressions from within an output
plugin. Despite that having been brought up multiple times.
> +static ExprState *
> +pgoutput_row_filter_prepare_expr(Node *rfnode, EState *estate)
> +{
> + ExprState *exprstate;
> + Oid exprtype;
> + Expr *expr;
> +
> + /* Prepare expression for execution */
> + exprtype = exprType(rfnode);
> + expr = (Expr *) coerce_to_target_type(NULL, rfnode, exprtype, BOOLOID,
> -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
> +
> + if (expr == NULL)
> + ereport(ERROR,
> + (errcode(ERRCODE_CANNOT_COERCE),
> + errmsg("row filter returns type %s that cannot
> be coerced to the expected type %s",
> + format_type_be(exprtype),
> + format_type_be(BOOLOID)),
> + errhint("You will need to rewrite the row
> filter.")));
> +
> + exprstate = ExecPrepareExpr(expr, estate);
> +
> + return exprstate;
> +}
> +
> +/*
> + * Evaluates row filter.
> + *
> + * If the row filter evaluates to NULL, it is taken as false i.e. the change
> + * isn't replicated.
> + */
> +static inline bool
> +pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
> +{
> + Datum ret;
> + bool isnull;
> +
> + Assert(state != NULL);
> +
> + ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
> +
> + elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
> + DatumGetBool(ret) ? "true" : "false",
> + isnull ? "true" : "false");
> +
> + if (isnull)
> + return false;
> +
> + return DatumGetBool(ret);
> +}
> +/*
> + * Change is checked against the row filter, if any.
> + *
> + * If it returns true, the change is replicated, otherwise, it is not.
> + */
> +static bool
> +pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple
> newtuple, List *rowfilter)
> +{
> + TupleDesc tupdesc;
> + EState *estate;
> + ExprContext *ecxt;
> + MemoryContext oldcxt;
> + ListCell *lc;
> + bool result = true;
> +
> + /* Bail out if there is no row filter */
> + if (rowfilter == NIL)
> + return true;
> +
> + elog(DEBUG3, "table \"%s.%s\" has row filter",
> +
> get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
> + get_rel_name(relation->rd_id));
> +
> + tupdesc = RelationGetDescr(relation);
> +
> + estate = create_estate_for_relation(relation);
> +
> + /* Prepare context per tuple */
> + ecxt = GetPerTupleExprContext(estate);
> + oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
> + ecxt->ecxt_scantuple = ExecInitExtraTupleSlot(estate, tupdesc,
> &TTSOpsHeapTuple);
> + MemoryContextSwitchTo(oldcxt);
> +
> + ExecStoreHeapTuple(newtuple ? newtuple : oldtuple,
> ecxt->ecxt_scantuple, false);
> + /*
> + * If the subscription has multiple publications and the same table has
> a
> + * different row filter in these publications, all row filters must be
> + * matched in order to replicate this change.
> + */
> + foreach(lc, rowfilter)
> + {
> + Node *rfnode = (Node *) lfirst(lc);
> + ExprState *exprstate;
> +
> + /* Prepare for expression execution */
> + exprstate = pgoutput_row_filter_prepare_expr(rfnode, estate);
> +
> + /* Evaluates row filter */
> + result = pgoutput_row_filter_exec_expr(exprstate, ecxt);
Also, this still seems like an *extremely* expensive thing to do for
each tuple. It'll often be *vastly* faster to just send the data than to
the other side.
This just cannot be done once per tuple. It has to be cached.
I don't see how these issues can be addressed in the next 7 days,
therefore I think this unfortunately needs to be marked as returned with
feedback.
Greetings,
Andres Freund