On Fri, 27 Sept 2024 at 08:17, Nathan Bossart <nathandboss...@gmail.com> wrote:
> Here's a patch that adjusts several routines in nbtcompare.c and related
> files to use the branchless integer comparison functions added in commit
> 6b80394.  It's probably unlikely this produces a measurable benefit (at
> least I've been unable to find any in my admittedly-limited testing), but
> in theory it should save a cycle here and there.  I was hoping that this
> would trim many lines of code, but maintaining the STRESS_SORT_INT_MIN
> stuff eats up most of what we save.

I had been looking at [1] (which I've added your version to now). I
had been surprised to see gcc emitting different code for the first 3
versions. Clang does a better job at figuring out they all do the same
thing and emitting the same code for each.

I played around with the attached (hacked up) qsort.c to see if there
was any difference.  Likely function call overhead kills the
performance anyway. There does not seem to be much difference between
them. I've not tested with an inlined comparison function.

Looking at your version, it doesn't look like there's any sort of
improvement in terms of the instructions. Certainly, for clang, it's
worse as it adds a shift left instruction and an additional compare.
No jumps, at least.

What's your reasoning for returning INT_MIN and INT_MAX?

David

[1] https://godbolt.org/z/33T8h151M
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#define swapcode(TYPE, parmi, parmj, n) \
do {            \
        size_t i = (n) / sizeof (TYPE);                 \
        TYPE *pi = (TYPE *)(void *)(parmi);                     \
        TYPE *pj = (TYPE *)(void *)(parmj);                     \
        do {                                            \
                TYPE    t = *pi;                        \
                *pi++ = *pj;                            \
                *pj++ = t;                              \
                } while (--i > 0);                              \
} while (0)

#define SWAPINIT(a, es) swaptype = ((char *)(a) - (char *)0) % sizeof(long) || \
        (es) % sizeof(long) ? 2 : (es) == sizeof(long)? 0 : 1

#define Min(x, y)               ((x) < (y) ? (x) : (y))

static void
swapfunc(char *a, char *b, size_t n, int swaptype)
{
        if (swaptype <= 1)
                swapcode(long, a, b, n);
        else
                swapcode(char, a, b, n);
}

#define swap(a, b)                                              \
        if (swaptype == 0) {                                    \
                long t = *(long *)(void *)(a);                  \
                *(long *)(void *)(a) = *(long *)(void *)(b);    \
                *(long *)(void *)(b) = t;                       \
        } else                                                  \
                swapfunc(a, b, es, swaptype)

#define vecswap(a, b, n) if ((n) > 0) swapfunc(a, b, n, swaptype)

static char *
med3(char *a, char *b, char *c, int (*cmp) (const void *, const void *))
{
        return cmp(a, b) < 0 ?
                (cmp(b, c) < 0 ? b : (cmp(a, c) < 0 ? c : a))
                : (cmp(b, c) > 0 ? b : (cmp(a, c) < 0 ? a : c));
}

void
pg_qsort(void *a, size_t n, size_t es, int (*cmp) (const void *, const void *))
{
                char       *pa,
                                                   *pb,
                                                   *pc,
                                                   *pd,
                                                   *pl,
                                                   *pm,
                                                   *pn;
                size_t            d1,
                                                                d2;
                int                                      r,
                                                                swaptype,
                                                                presorted;

loop:SWAPINIT(a, es);
                if (n < 7)
                {
                                for (pm = (char *) a + es; pm < (char *) a + n 
* es; pm += es)
                                                for (pl = pm; pl > (char *) a 
&& cmp(pl - es, pl) > 0;
                                                                 pl -= es)
                                                                swap(pl, pl - 
es);
                                return;
                }
                presorted = 1;
                for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
                {
                                if (cmp(pm - es, pm) > 0)
                                {
                                                presorted = 0;
                                                break;
                                }
                }
                if (presorted)
                                return;
                pm = (char *) a + (n / 2) * es;
                if (n > 7)
                {
                                pl = (char *) a;
                                pn = (char *) a + (n - 1) * es;
                                if (n > 40)
                                {
                                                size_t            d = (n / 8) * 
es;

                                                pl = med3(pl, pl + d, pl + 2 * 
d, cmp);
                                                pm = med3(pm - d, pm, pm + d, 
cmp);
                                                pn = med3(pn - 2 * d, pn - d, 
pn, cmp);
                                }
                                pm = med3(pl, pm, pn, cmp);
                }
                swap(a, pm);
                pa = pb = (char *) a + es;
                pc = pd = (char *) a + (n - 1) * es;
                for (;;)
                {
                                while (pb <= pc && (r = cmp(pb, a)) <= 0)
                                {
                                                if (r == 0)
                                                {
                                                                swap(pa, pb);
                                                                pa += es;
                                                }
                                                pb += es;
                                }
                                while (pb <= pc && (r = cmp(pc, a)) >= 0)
                                {
                                                if (r == 0)
                                                {
                                                                swap(pc, pd);
                                                                pd -= es;
                                                }
                                                pc -= es;
                                }
                                if (pb > pc)
                                                break;
                                swap(pb, pc);
                                pb += es;
                                pc -= es;
                }
                pn = (char *) a + n * es;
                d1 = Min(pa - (char *) a, pb - pa);
                vecswap(a, pb - d1, d1);
                d1 = Min(pd - pc, pn - pd - es);
                vecswap(pb, pn - d1, d1);
                d1 = pb - pa;
                d2 = pd - pc;
                if (d1 <= d2)
                {
                                /* Recurse on left partition, then iterate on 
right partition */
                                if (d1 > es)
                                                pg_qsort(a, d1 / es, es, cmp);
                                if (d2 > es)
                                {
                                                /* Iterate rather than recurse 
to save stack space */
                                                /* pg_qsort(pn - d2, d2 / es, 
es, cmp); */
                                                a = pn - d2;
                                                n = d2 / es;
                                                goto loop;
                                }
                }
                else
                {
                                /* Recurse on right partition, then iterate on 
left partition */
                                if (d2 > es)
                                                pg_qsort(pn - d2, d2 / es, es, 
cmp);
                                if (d1 > es)
                                {
                                                /* Iterate rather than recurse 
to save stack space */
                                                /* pg_qsort(a, d1 / es, es, 
cmp); */
                                                n = d1 / es;
                                                goto loop;
                                }
                }
}

int cmp1(const void *pa, const void *pb)
{
        int a = *(int *) pa;
        int b = *(int *) pb;
        
        if (a > b)
                return 1;
        else if (a == b)
                return 0;
        else
                return -1;
}

int cmp2(const void *pa, const void *pb)
{
        int a = *(int *) pa;
        int b = *(int *) pb;
        
        return (a > b) - (a < b);
}

int cmp3(const void *pa, const void *pb)
{
        int a = *(int *) pa;
        int b = *(int *) pb;
        
        if (a < b)
                return -1;
        else if (a > b)
                return 1;
        else
                return 0;
}

#define STRESS_SORT_INT_MIN_CMP(a, b) \
( \
        ((a) > (b)) ? INT_MAX : \
        ((a) < (b)) ? INT_MIN : \
        0 \
)

int cmp4(const void *pa, const void *pb)
{
        int a = *(int *) pa;
        int b = *(int *) pb;
        
        return STRESS_SORT_INT_MIN_CMP(a, b);
}

#define NS_PER_SECOND 1000000000

void sub_timespec(struct timespec t1, struct timespec t2, struct timespec *td)
{
    td->tv_nsec = t2.tv_nsec - t1.tv_nsec;
    td->tv_sec  = t2.tv_sec - t1.tv_sec;
    if (td->tv_sec > 0 && td->tv_nsec < 0)
    {
        td->tv_nsec += NS_PER_SECOND;
        td->tv_sec--;
    }
    else if (td->tv_sec < 0 && td->tv_nsec > 0)
    {
        td->tv_nsec -= NS_PER_SECOND;
        td->tv_sec++;
    }
}

#define ARRAY_SIZE 50000
#define LOOPS 100

int main(int argc, char **argv)
{
        struct timespec start, finish, delta;
        int *random_array = malloc(ARRAY_SIZE * sizeof(int));
        int *array2sort = malloc(ARRAY_SIZE * sizeof(int));
        
        if (argc >= 2)
        {
                int rs = atoi(argv[1]);
                srand(rs);
                printf("random seed set to %d\n", rs);
        }
        
        for (int i = 0; i < ARRAY_SIZE; i++)
                random_array[i] = (int) (rand() * INT_MAX);

        // for(int i=0; i < 10; i++)
                // printf("%d. %d\n", i,random_array[i]);

        clock_gettime(CLOCK_REALTIME, &start);
        for (int i = 0; i < LOOPS; i++)
        {
                memcpy(array2sort, random_array, ARRAY_SIZE * sizeof(int));
                pg_qsort(array2sort, ARRAY_SIZE, sizeof(int), cmp1);
        }
        clock_gettime(CLOCK_REALTIME, &finish);
        sub_timespec(start, finish, &delta);
        printf("cmp1 done in %d.%.9ld seconds. Lowest value is %d\n", 
(int)delta.tv_sec, delta.tv_nsec, array2sort[0]);

        clock_gettime(CLOCK_REALTIME, &start);
        for (int i = 0; i < LOOPS; i++)
        {
                memcpy(array2sort, random_array, ARRAY_SIZE * sizeof(int));
                pg_qsort(array2sort, ARRAY_SIZE, sizeof(int), cmp2);
        }
        clock_gettime(CLOCK_REALTIME, &finish);
        sub_timespec(start, finish, &delta);
        printf("cmp2 done in %d.%.9ld seconds. Lowest value is %d\n", 
(int)delta.tv_sec, delta.tv_nsec, array2sort[0]);

        clock_gettime(CLOCK_REALTIME, &start);
        for (int i = 0; i < LOOPS; i++)
        {
                memcpy(array2sort, random_array, ARRAY_SIZE * sizeof(int));
                pg_qsort(array2sort, ARRAY_SIZE, sizeof(int), cmp3);
        }
        clock_gettime(CLOCK_REALTIME, &finish);
        sub_timespec(start, finish, &delta);
        printf("cmp3 done in %d.%.9ld seconds. Lowest value is %d\n", 
(int)delta.tv_sec, delta.tv_nsec, array2sort[0]);

        clock_gettime(CLOCK_REALTIME, &start);
        for (int i = 0; i < LOOPS; i++)
        {
                memcpy(array2sort, random_array, ARRAY_SIZE * sizeof(int));
                pg_qsort(array2sort, ARRAY_SIZE, sizeof(int), cmp4);
        }
        clock_gettime(CLOCK_REALTIME, &finish);
        sub_timespec(start, finish, &delta);
        printf("cmp4 done in %d.%.9ld seconds. Lowest value is %d\n", 
(int)delta.tv_sec, delta.tv_nsec, array2sort[0]);

        return 0;
}

Reply via email to