c使用栅栏(barrier)进行多线程(pthread)排序

一、环境:

macos

二、排序数据量:

800W

二、执行结果:

8个线程:执行时间约为2秒。

1个线程:执行时间约为6秒。

pthread_barrier.h

//
//  pthread_barrier.h
//  TestC
//
//  Created by TangPing on 2018/1/13.
//  Copyright © 2018年 TangPing. All rights reserved.
//

#ifndef pthread_barrier_h
#define pthread_barrier_h

#ifdef __APPLE__

#include <pthread.h>
#include <errno.h>

typedef int pthread_barrierattr_t;
typedef struct
{
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int count;
    int tripCount;
} pthread_barrier_t;


int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count)
{
    if(count == 0)
    {
        errno = EINVAL;
        return -1;
    }
    if(pthread_mutex_init(&barrier->mutex, 0) < 0)
    {
        return -1;
    }
    if(pthread_cond_init(&barrier->cond, 0) < 0)
    {
        pthread_mutex_destroy(&barrier->mutex);
        return -1;
    }
    barrier->tripCount = count;
    barrier->count = 0;
   
    return 0;
}

int pthread_barrier_destroy(pthread_barrier_t *barrier)
{
    pthread_cond_destroy(&barrier->cond);
    pthread_mutex_destroy(&barrier->mutex);
    return 0;
}

int pthread_barrier_wait(pthread_barrier_t *barrier)
{
    pthread_mutex_lock(&barrier->mutex);
    ++(barrier->count);
    if(barrier->count >= barrier->tripCount)
    {
        barrier->count = 0;
        pthread_cond_broadcast(&barrier->cond);
        pthread_mutex_unlock(&barrier->mutex);
        return 1;
    }
    else
    {
        pthread_cond_wait(&barrier->cond, &(barrier->mutex));
        pthread_mutex_unlock(&barrier->mutex);
        return 0;
    }
}

#endif // __APPLE__

#endif /* pthread_barrier_h */

multi_thread.c

//
//  multi_thread.c
//  TestC
//
//  Created by TangPing on 2018/1/13.
//  Copyright © 2018年 TangPing. All rights reserved.
//

#include <stdlib.h>
#include <limits.h>
#include <sys/time.h>
#include "pthread_barrier.h"

#define NTHR   8                /* number of threads */
#define NUMNUM 8000000L         /* number of numbers to sort */
#define TNUM   (NUMNUM/NTHR)    /* number to sort per thread */

long nums[NUMNUM];
long snums[NUMNUM];

pthread_barrier_t b;

#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void *, size_t, size_t,
                    int (*)(const void *, const void *));
#endif

/*
 * Compare two long integers (helper function for heapsort)
 */

int
complong(const void *arg1, const void *arg2)
{
    long l1 = *(long *)arg1;
    long l2 = *(long *)arg2;
   
    if (l1 == l2)
        return 0;
    else if (l1 < l2)
        return -1;
    else
        return 1;
}

/*
 * Worker thread to sort a portion of the set of numbers.
 */

void *
thr_fn(void *arg)
{
    long    idx = (long)arg;
   
    heapsort(&nums[idx], TNUM, sizeof(long), complong);
    pthread_barrier_wait(&b);
   
    /*
     * Go off and perform more work ...
     */

    return((void *)0);
}

/*
 * Merge the results of the individual sorted ranges.
 */

void
merge()
{
    long    idx[NTHR];
    long    i, minidx, sidx, num;
   
    for (i = 0; i < NTHR; i++)
        idx[i] = i * TNUM;
    for (sidx = 0; sidx < NUMNUM; sidx++) {
        num = LONG_MAX;
        /*
         * 取出每个段中的最小值,然后存入snums中,并且段索引自增
         */

        for (i = 0; i < NTHR; i++) {
            if ((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num)) {
                num = nums[idx[i]];
                minidx = i;
            }
        }
        snums[sidx] = nums[idx[minidx]];
        idx[minidx]++;
    }
}

int
main()
{
    unsigned long   i;
    struct timeval  start, end;
    long long       startusec, endusec;
    double          elapsed;
    int             err;
    pthread_t       tid;
   
    /*
     * Create the initial set of numbers to sort.
     */

    srandom(1);
    for (i = 0; i < NUMNUM; i++)
        nums[i] = random();
   
    /*
     * Create 8 threads to sort the numbers.
     */

    gettimeofday(&start, NULL);
    pthread_barrier_init(&b, NULL, NTHR+1);
    for (i = 0; i < NTHR; i++) {
        err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));
        if (err != 0){
            printf("can't create thread");
            exit(err);
            //err_exit(err, "can't create thread");
        }
    }
    pthread_barrier_wait(&b);
    merge();
    gettimeofday(&end, NULL);
   
    /*
     * Print the sorted list.
     */

    startusec = start.tv_sec * 1000000 + start.tv_usec;
    endusec = end.tv_sec * 1000000 + end.tv_usec;
    elapsed = (double)(endusec - startusec) / 1000000.0;
    printf("sort took %.4f seconds\n", elapsed);
    for (i = 0; i < NUMNUM; i++){
        printf("%ld\n", snums[i]);
    }
    exit(0);
}

Leave a Comment