threadpool
h1. Using Thread Pool Library
h3. ¶óÀ̺귯¸® Á¤º¸
unsigned int worker_num;
unsigned int max_queue_size;
unsigned int curr_queue_size;
struct threadpool_work *work_queue_head;
struct threadpool_work *work_queue_tail;
pthread_t *threads;
pthread_mutex_t work_lock;
pthread_cond_t work_pool_available;
pthread_cond_t work_pool_exist;
pthread_cond_t work_empty;
int working;
} thpool;
{code}
* ¼³¸í
threadpool¿¡ ÇÊ¿äÇÑ Á¤º¸µéÀ» ±¸Á¶Ã¼ Çü½ÄÀ¸·Î ¹¾îµÎ¾ú´Ù.
worker_num°ú max_queue_size´Â °¢°¢ »ý¼ºÇÒ ½º·¹µå Ç®ÀÇ °³¼ö¿Í ÀÛ¾÷À» ÇÒ´çÇÒ work poolÀÇ ÃÖ´ë »çÀÌÁîÀÌ´Ù.
curr_queue_size´Â ÇöÀç work_pool¿¡ ÇÒ´çµÈ ÀÛ¾÷ÀÇ °³¼ö¸¦ ³ªÅ¸³½´Ù.
work_queue_head, work_queue_tailÀº work poolÀ» À§ÇÑ queueÀÇ head¿Í tailÀ» ³ªÅ¸³½´Ù.
threads´Â pthread_create() ÇÔ¼ö¿¡ ÀÇÇØ »ý¼ºµÈ ½º·¹µåÀÇ ID°ªÀÌ ÀúÀåµÈ´Ù.
work_lockÀº ½º·¹µå Ç®ÀÇ ÀÛ¾÷ ¼öÇà ÇÔ¼ö¿¡¼ Àü¿ªÀûÀ¸·Î »ç¿ëµÇ´Â µ¥ÀÌÅ͵鿡 ´ëÇÑ mutex lockÀ» À§ÇØ ¼±¾ðµÇ¾ú´Ù.
work_pool_availableÀº work pool¿¡ ÇÒ´çÇÒ ¼ö ÀÖ´Â ÀÛ¾÷ °ø°£ÀÌ ÀÖ´Ù´Â °Í¿¡ ´ëÇÑ ½ÅÈ£¸¦ º¸³»±â À§ÇØ ¼±¾ðµÇ¾ú´Ù.
work_pool_exist´Â ÀÛ¾÷ÀÌ ÇÒ´çµÈ µÚ, ÀÛ¾÷ ¼öÇà ÇÔ¼ö¿¡°Ô ÇöÀç ÇÒ´çµÈ ÀÛ¾÷ÀÌ ÀÖ´Ù´Â ½ÅÈ£¸¦ º¸³»±â À§ÇØ ¼±¾ðµÇ¾ú´Ù.
h3. threadpool.h - ÇÔ¼ö
{code:java|title=threadpool_init(), ½º·¹µå Ç® ÃʱâÈ ÇÔ¼ö}
int threadpool_init( thpool **init_pool, unsigned int worker_num, unsigned int max_queue );
{code}
* threadpoolÀÇ Ãʱâȸ¦ À§ÇÑ ÇÔ¼öÀÌ´Ù.
* ù ¹ø° ÀÎÀÚÀÎ init_poolÀº thpool ±¸Á¶Ã¼ º¯¼ö°¡ Àü´ÞµÇ¸ç, ÇØ´ç º¯¼ö¿¡ ´ëÇÑ º°µµÀÇ ÀÛ¾÷Àº ÇÊ¿äÇÏÁö ¾Ê´Ù.
* µÎ ¹ø° ÀÎÀÚÀÎ worker_numÀº thread poolÀ» À§ÇÑ ½º·¹µåÀÇ °³¼ö°¡ Àü´ÞµÈ´Ù.
* ¼¼ ¹ø° ÀÎÀÚÀÎ max_queue´Â work queueÀÇ Å©±â¸¦ ³ªÅ¸³½´Ù.
{code:java|title=threadpool_worker(), queue¿¡ ÇÒ´çµÈ ÀÛ¾÷À» ½ÇÇàÇÏ´Â threadpool worker ÇÔ¼ö}
void *threadpool_worker( void *arg );
{code}
* queue¿¡ ÇÒ´çµÈ ÀÛ¾÷À» ¼öÇàÇϱâ À§ÇÑ ÇÔ¼ö·Î, ¶óÀ̺귯¸® ³»ºÎ¿¡¼ ½ÇÇàµÇ±â ¶§¹®¿¡ »ç¿ëÀÚ´Â °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.
{code:java|title=threadpool_add_work(), ½º·¹µå Ç®¿¡ ÀÛ¾÷À» Ãß°¡ÇÏ´Â ÇÔ¼ö}
int threadpool_add_work( thpool *th_pool, void *(*worker)(void *), void *worker_arg );
{code}
* thread pool¿¡ ÀÛ¾÷À» ÇÒ´çÇÏ´Â ÇÔ¼öÀÌ´Ù.
* ù ¹ø° ÀÎÀÚÀÎ th_pool¿¡´Â ÀÌÀü¿¡ ÃʱâÈ ÇÔ¼ö¸¦ ÅëÇÏ¿© Àü´ÞµÇ¾ú´ø ù ¹ø° º¯¼ö°¡ ±×´ë·Î Àü´ÞµÈ´Ù.
* µÎ ¹ø° ÀÎÀÚÀÎ workerÀº ½ÇÁ¦ ½º·¹µå°¡ ¼öÇàÇÒ ÇÔ¼ö°¡ Àü´ÞµÈ´Ù.
* ¼¼ ¹ø° ÀÎÀÚÀÎ worker_arg´Â ±×¿¡ ´ëÇÑ(½º·¹µå ÇÔ¼ö) ÀÎÀÚ°¡ Àü´ÞµÈ´Ù.
{code:java|title=threadpool_adjust(), ½º·¹µå Ç® °¹¼ö¸¦ ÀçÁ¶Á¤ÇÏ´Â ÇÔ¼ö}
int threadpool_adjust( thpool **adj_pool, unsigned int worker_num, unsigned int max_queue );
{code}
* Áß°£¿¡ thread pool ¹× work poolÀÇ Å©±â¸¦ Á¶ÀýÇϱâ À§ÇÑ ÇÔ¼ö·Î½á, ÇöÀç´Â threadpoolÀÇ destroy&init¸¦ wrappingÇϴ°ÍÀ¸·Î ´ëüµÇ¾ú´Ù.
* ù ¹ø° ÀÎÀÎ adj_poolÀº ÀÌÀü¿¡ ÃʱâÈ ÇÔ¼ö¸¦ ÅëÇÏ¿© Àü´ÞµÇ¾ú´ø ù ¹ø° º¯¼ö°¡ ±×´ë·Î Àü´ÞµÈ´Ù.
* µÎ ¹ø°¿Í ¼¼ ¹ø° ÀÎÀÚÀÎ worker_num°ú max_queue´Â °¢°¢ thread poolÀÇ Å©±â, work poolÀÇ Å©±âÀÌ´Ù.
{code:java|title=threadpool_destroy(), ½º·¹µå Ç® ¼Ò¸ê ÇÔ¼ö}
int threadpool_destroy( thpool *thp );
{code}
* thread poolÀÇ Á¾·á ½Ã ¼Ò¸ê °úÁ¤À» ¼öÇàÇÏ´Â ÇÔ¼öÀÌ´Ù.
* ÀÎÀÚÀÎ thp¿¡´Â ÀÌÀü¿¡ ÃʱâÈ ÇÔ¼ö¸¦ ÅëÇÏ¿© Àü´ÞµÇ¾ú´ø ù ¹ø° º¯¼ö°¡ ±×´ë·Î Àü´ÞµÈ´Ù.
{code:java|title=work_queue_none()}
static int inline work_queue_none( thpool *thp )
{
return (thp->curr_queue_size == 0);
}
{code}
* ÇöÀç work poolÀÌ ºñ¾îÀÖ´ÂÁö üũÇÏ´Â ÇÔ¼ö
* thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.
{code:java|title=work_queue_full()}
static int inline work_queue_full( thpool *thp )
{
return (thp->curr_queue_size == thp->max_queue_size);
}
{code}
* ÇöÀç work poolÀÌ ÃÖ´ëÄ¡ÀÎÁö üũÇÏ´Â ÇÔ¼ö
* thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.
{code:java|title=thp_error_handling()}
/* ¿¡·¯ Çڵ鸵 ÇÔ¼ö, ¿¡·¯ 󸮴 ¸ðµÎ ÇØ´ç ÇÔ¼ö·Î Ç߱⠶§¹®¿¡,
* thread pool ³»ºÎ¿¡¼ »ç¿ëµÇ´Â error handling Àü¿ë ÇÔ¼öÀÌ´Ù.
* ¿¡·¯ 󸮸¦ À§ÇÑ ·çƾÀº µû·Î wrappingÇØ µÎ¾úÀ¸¸ç, ¼öÁ¤ÇÏ¿© »ç¿ëÀÌ °¡´ÉÇÏ´Ù.
* thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.
h3. threadpool_queue.h - ±¸Á¶Ã¼
{code:java|title=thpwork}
/* queue ÇüÅ·ΠµÇ¾îÀÖ´Â work ±¸Á¶Ã¼ */
typedef struct threadpool_work {
void *(*worker_func)(void *);
void *worker_arg;
struct threadpool_work *prev;
struct threadpool_work *next;
} thpwork;
{code}
* ¼³¸í
½º·¹µå°¡ ¼öÇàÇÒ ÀÛ¾÷À» work queue ÇüÅ·ΠÇÒ´çÇϱâ À§ÇØ ¸¸µé¾îÁø ±¸Á¶Ã¼·Î,
worker_func¿Í worker_arg´Â °¢°¢ ¼öÇàÇÒ ½º·¹µå ÇÔ¼ö¿Í ±×¿¡ ´ëÇÑ ÀÎÀÚ¸¦ ³ªÅ¸³½´Ù.
prev, next´Â owrker queueÀÇ Àü/Èĸ¦ °¡¸®Å°´Â linked listÀÌ´Ù.
h3. threadpool_queue.h - ÇÔ¼ö
{code:java|title=queue_init()}
void queue_init( thpwork **head, thpwork **tail );
{code}
* work poolÀ» À§ÇÑ queue¸¦ ÃʱâÈÇÏ´Â ÇÔ¼öÀÌ´Ù.
* thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.
{code:java|title=queue_push()}
void queue_push( thpwork *v, thpwork *head, thpwork *tail );
{code}
* work pool¿¡ ÀÛ¾÷À» ÇÒ´çÇϱâ À§ÇÑ queueÀÇ push ÇÔ¼öÀÌ´Ù.
* thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.
{code:java|title=queue_pop()}
thpwork *queue_pop( thpwork *head, thpwork *tail );
{code}
* work pool¿¡ ÇÒ´çµÈ ÀÛ¾÷À» ¼öÇàÇϱâ À§ÇØ °¡Á®¿À´Â queueÀÇ pop ÇÔ¼öÀÌ´Ù.
* thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.
{code:java|title=queue_destroy()}
void queue_destroy( thpwork *head, thpwork *tail );
{code}
* work poolÀÇ ¼Ò¸ê°úÁ¤À» ¼öÇàÇÏ´Â queueÀÇ destroy ÇÔ¼öÀÌ´Ù.
* thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.
h3. »ç¿ë ¿¹ - Pseudo Code
* ÇØ´ç ¶óÀ̺귯¸®¸¦ »ç¿ëÇÏ¸é ¸î ¹øÀÇ ÇÔ¼ö È£Ãâ·Î ½º·¹µå Ç®À» °£´ÜÈ÷ Àû¿ëÇÒ ¼ö ÀÖ´Ù.
{code}
void *thrd_func( int *client_sock );
int main( void )
{
thpool *th_pool; // ½º·¹µå Ç®¿¡ »ç¿ëµÇ´Â º¯¼ö
}
{code}
... threadpool_init( &th_pool, 7, 64 ); // ½º·¹µå Ç®¿¡ »ç¿ëµÇ´Â ÃʱâÈ ÇÔ¼ö while(1) { cli_sfd = accept(..);
}
threadpool_destroy( th_pool ); // ½º·¹µå Ç®¿¡ »ç¿ëµÇ´Â ¼Ò¸ê ÇÔ¼ö
threadpool_add_work( th_pool, (void *)thrd_func, &cli_sfd ); // ½º·¹µå Ç®¿¡ »ç¿ëµÇ´Â ÀÛ¾÷ Ãß°¡ ÇÔ¼ö if( somecheck ) { threadpool_adjust( &th_pool, 5, 32 ); // ½º·¹µå Ç®¿¡ »ç¿ëµÇ´Â Ç® Á¤º¸ Á¶Àý ÇÔ¼ö
}
return 0; |
It's a poor workman who blames his tools. |