· KLDP.org · KLDP.net · KLDP Wiki · KLDP BBS ·
threadpool

h1. Using Thread Pool Library

h3. ¶óÀ̺귯¸® Á¤º¸
  • ¶óÀ̺귯¸® À§Ä¡: libvos/include/threadpool/*
  • ¶óÀ̺귯¸® ÆÄÀÏ: threadpool.h threadpool_queue.h threadpool.c threadpool_queue.c
  • include´Â "threadpool.h"¸¸ ÇÏ¸é µÊ


h3. threadpool.h - ±¸Á¶Ã¼ {code:java|title=thpool} /* threadpool¿¡ ÇÊ¿äÇÑ ¸ðµç Á¤º¸¸¦ Æ÷ÇÔÇÏ´Â ±¸Á¶Ã¼ */ typedef struct threadpool_all {
unsigned int worker_num; unsigned int max_queue_size; unsigned int curr_queue_size; struct threadpool_work *work_queue_head; struct threadpool_work *work_queue_tail; pthread_t *threads; pthread_mutex_t work_lock; pthread_cond_t work_pool_available; pthread_cond_t work_pool_exist; pthread_cond_t work_empty; int working;
} thpool; {code}

* ¼³¸í threadpool¿¡ ÇÊ¿äÇÑ Á¤º¸µéÀ» ±¸Á¶Ã¼ Çü½ÄÀ¸·Î ¹­¾îµÎ¾ú´Ù. worker_num°ú max_queue_size´Â °¢°¢ »ý¼ºÇÒ ½º·¹µå Ç®ÀÇ °³¼ö¿Í ÀÛ¾÷À» ÇÒ´çÇÒ work poolÀÇ ÃÖ´ë »çÀÌÁîÀÌ´Ù. curr_queue_size´Â ÇöÀç work_pool¿¡ ÇÒ´çµÈ ÀÛ¾÷ÀÇ °³¼ö¸¦ ³ªÅ¸³½´Ù. work_queue_head, work_queue_tailÀº work poolÀ» À§ÇÑ queueÀÇ head¿Í tailÀ» ³ªÅ¸³½´Ù. threads´Â pthread_create() ÇÔ¼ö¿¡ ÀÇÇØ »ý¼ºµÈ ½º·¹µåÀÇ ID°ªÀÌ ÀúÀåµÈ´Ù. work_lockÀº ½º·¹µå Ç®ÀÇ ÀÛ¾÷ ¼öÇà ÇÔ¼ö¿¡¼­ Àü¿ªÀûÀ¸·Î »ç¿ëµÇ´Â µ¥ÀÌÅ͵鿡 ´ëÇÑ mutex lockÀ» À§ÇØ ¼±¾ðµÇ¾ú´Ù. work_pool_availableÀº work pool¿¡ ÇÒ´çÇÒ ¼ö ÀÖ´Â ÀÛ¾÷ °ø°£ÀÌ ÀÖ´Ù´Â °Í¿¡ ´ëÇÑ ½ÅÈ£¸¦ º¸³»±â À§ÇØ ¼±¾ðµÇ¾ú´Ù. work_pool_exist´Â ÀÛ¾÷ÀÌ ÇÒ´çµÈ µÚ, ÀÛ¾÷ ¼öÇà ÇÔ¼ö¿¡°Ô ÇöÀç ÇÒ´çµÈ ÀÛ¾÷ÀÌ ÀÖ´Ù´Â ½ÅÈ£¸¦ º¸³»±â À§ÇØ ¼±¾ðµÇ¾ú´Ù.


h3. threadpool.h - ÇÔ¼ö

{code:java|title=threadpool_init(), ½º·¹µå Ç® ÃʱâÈ­ ÇÔ¼ö} int threadpool_init( thpool **init_pool, unsigned int worker_num, unsigned int max_queue ); {code}

* threadpoolÀÇ ÃʱâÈ­¸¦ À§ÇÑ ÇÔ¼öÀÌ´Ù. * ù ¹ø° ÀÎÀÚÀÎ init_poolÀº thpool ±¸Á¶Ã¼ º¯¼ö°¡ Àü´ÞµÇ¸ç, ÇØ´ç º¯¼ö¿¡ ´ëÇÑ º°µµÀÇ ÀÛ¾÷Àº ÇÊ¿äÇÏÁö ¾Ê´Ù. * µÎ ¹ø° ÀÎÀÚÀÎ worker_numÀº thread poolÀ» À§ÇÑ ½º·¹µåÀÇ °³¼ö°¡ Àü´ÞµÈ´Ù. * ¼¼ ¹ø° ÀÎÀÚÀÎ max_queue´Â work queueÀÇ Å©±â¸¦ ³ªÅ¸³½´Ù.



{code:java|title=threadpool_worker(), queue¿¡ ÇÒ´çµÈ ÀÛ¾÷À» ½ÇÇàÇÏ´Â threadpool worker ÇÔ¼ö} void *threadpool_worker( void *arg ); {code}

* queue¿¡ ÇÒ´çµÈ ÀÛ¾÷À» ¼öÇàÇϱâ À§ÇÑ ÇÔ¼ö·Î, ¶óÀ̺귯¸® ³»ºÎ¿¡¼­ ½ÇÇàµÇ±â ¶§¹®¿¡ »ç¿ëÀÚ´Â °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.



{code:java|title=threadpool_add_work(), ½º·¹µå Ç®¿¡ ÀÛ¾÷À» Ãß°¡ÇÏ´Â ÇÔ¼ö} int threadpool_add_work( thpool *th_pool, void *(*worker)(void *), void *worker_arg ); {code}

* thread pool¿¡ ÀÛ¾÷À» ÇÒ´çÇÏ´Â ÇÔ¼öÀÌ´Ù. * ù ¹ø° ÀÎÀÚÀÎ th_pool¿¡´Â ÀÌÀü¿¡ ÃʱâÈ­ ÇÔ¼ö¸¦ ÅëÇÏ¿© Àü´ÞµÇ¾ú´ø ù ¹ø° º¯¼ö°¡ ±×´ë·Î Àü´ÞµÈ´Ù. * µÎ ¹ø° ÀÎÀÚÀÎ workerÀº ½ÇÁ¦ ½º·¹µå°¡ ¼öÇàÇÒ ÇÔ¼ö°¡ Àü´ÞµÈ´Ù. * ¼¼ ¹ø° ÀÎÀÚÀÎ worker_arg´Â ±×¿¡ ´ëÇÑ(½º·¹µå ÇÔ¼ö) ÀÎÀÚ°¡ Àü´ÞµÈ´Ù.


{code:java|title=threadpool_adjust(), ½º·¹µå Ç® °¹¼ö¸¦ ÀçÁ¶Á¤ÇÏ´Â ÇÔ¼ö} int threadpool_adjust( thpool **adj_pool, unsigned int worker_num, unsigned int max_queue ); {code}

* Áß°£¿¡ thread pool ¹× work poolÀÇ Å©±â¸¦ Á¶ÀýÇϱâ À§ÇÑ ÇÔ¼ö·Î½á, ÇöÀç´Â threadpoolÀÇ destroy&init¸¦ wrappingÇϴ°ÍÀ¸·Î ´ëüµÇ¾ú´Ù. * ù ¹ø° ÀÎÀÎ adj_poolÀº ÀÌÀü¿¡ ÃʱâÈ­ ÇÔ¼ö¸¦ ÅëÇÏ¿© Àü´ÞµÇ¾ú´ø ù ¹ø° º¯¼ö°¡ ±×´ë·Î Àü´ÞµÈ´Ù. * µÎ ¹ø°¿Í ¼¼ ¹ø° ÀÎÀÚÀÎ worker_num°ú max_queue´Â °¢°¢ thread poolÀÇ Å©±â, work poolÀÇ Å©±âÀÌ´Ù.



{code:java|title=threadpool_destroy(), ½º·¹µå Ç® ¼Ò¸ê ÇÔ¼ö} int threadpool_destroy( thpool *thp ); {code}

* thread poolÀÇ Á¾·á ½Ã ¼Ò¸ê °úÁ¤À» ¼öÇàÇÏ´Â ÇÔ¼öÀÌ´Ù. * ÀÎÀÚÀÎ thp¿¡´Â ÀÌÀü¿¡ ÃʱâÈ­ ÇÔ¼ö¸¦ ÅëÇÏ¿© Àü´ÞµÇ¾ú´ø ù ¹ø° º¯¼ö°¡ ±×´ë·Î Àü´ÞµÈ´Ù.


{code:java|title=work_queue_none()} static int inline work_queue_none( thpool *thp ) {
return (thp->curr_queue_size == 0);
} {code}

* ÇöÀç work poolÀÌ ºñ¾îÀÖ´ÂÁö üũÇÏ´Â ÇÔ¼ö * thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼­ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.

{code:java|title=work_queue_full()} static int inline work_queue_full( thpool *thp ) {
return (thp->curr_queue_size == thp->max_queue_size);
} {code}

* ÇöÀç work poolÀÌ ÃÖ´ëÄ¡ÀÎÁö üũÇÏ´Â ÇÔ¼ö * thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼­ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.

{code:java|title=thp_error_handling()} /* ¿¡·¯ Çڵ鸵 ÇÔ¼ö, ¿¡·¯ 󸮴 ¸ðµÎ ÇØ´ç ÇÔ¼ö·Î Ç߱⠶§¹®¿¡,
  • Â÷ÈÄ ÇÔ¼ö¸¦ ¼öÁ¤ÇÏ¿© ÄÜ¼Ö Ãâ·Â µîÀ¸·Î ¹Ù²ã »ç¿ë ÇÒ ¼ö ÀÖÀ½. */
static void inline thp_error_handling( char *str ) {
char err_msg512;

snprintf( err_msg, sizeof(err_msg) -1, "threadpool> %s", str ); perror(err_msg);
} {code}

* thread pool ³»ºÎ¿¡¼­ »ç¿ëµÇ´Â error handling Àü¿ë ÇÔ¼öÀÌ´Ù. * ¿¡·¯ 󸮸¦ À§ÇÑ ·çƾÀº µû·Î wrappingÇØ µÎ¾úÀ¸¸ç, ¼öÁ¤ÇÏ¿© »ç¿ëÀÌ °¡´ÉÇÏ´Ù. * thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼­ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.



h3. threadpool_queue.h - ±¸Á¶Ã¼ {code:java|title=thpwork} /* queue ÇüÅ·ΠµÇ¾îÀÖ´Â work ±¸Á¶Ã¼ */ typedef struct threadpool_work {
void *(*worker_func)(void *); void *worker_arg; struct threadpool_work *prev; struct threadpool_work *next;
} thpwork; {code}

* ¼³¸í ½º·¹µå°¡ ¼öÇàÇÒ ÀÛ¾÷À» work queue ÇüÅ·ΠÇÒ´çÇϱâ À§ÇØ ¸¸µé¾îÁø ±¸Á¶Ã¼·Î, worker_func¿Í worker_arg´Â °¢°¢ ¼öÇàÇÒ ½º·¹µå ÇÔ¼ö¿Í ±×¿¡ ´ëÇÑ ÀÎÀÚ¸¦ ³ªÅ¸³½´Ù. prev, next´Â owrker queueÀÇ Àü/Èĸ¦ °¡¸®Å°´Â linked listÀÌ´Ù.

h3. threadpool_queue.h - ÇÔ¼ö

{code:java|title=queue_init()} void queue_init( thpwork **head, thpwork **tail ); {code}

* work poolÀ» À§ÇÑ queue¸¦ ÃʱâÈ­ÇÏ´Â ÇÔ¼öÀÌ´Ù. * thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼­ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.

{code:java|title=queue_push()} void queue_push( thpwork *v, thpwork *head, thpwork *tail ); {code}

* work pool¿¡ ÀÛ¾÷À» ÇÒ´çÇϱâ À§ÇÑ queueÀÇ push ÇÔ¼öÀÌ´Ù. * thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼­ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.


{code:java|title=queue_pop()} thpwork *queue_pop( thpwork *head, thpwork *tail ); {code}

* work pool¿¡ ÇÒ´çµÈ ÀÛ¾÷À» ¼öÇàÇϱâ À§ÇØ °¡Á®¿À´Â queueÀÇ pop ÇÔ¼öÀÌ´Ù. * thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼­ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.

{code:java|title=queue_destroy()} void queue_destroy( thpwork *head, thpwork *tail ); {code}

* work poolÀÇ ¼Ò¸ê°úÁ¤À» ¼öÇàÇÏ´Â queueÀÇ destroy ÇÔ¼öÀÌ´Ù. * thread pool ¶óÀ̺귯¸® ³»ºÎ¿¡¼­ È£ÃâµÇ¹Ç·Î »ç¿ëÀÚ´Â ÇØ´ç ÇÔ¼ö¸¦ °í·ÁÇÒ ÇÊ¿ä°¡ ¾ø´Ù.

h3. »ç¿ë ¿¹ - Pseudo Code * ÇØ´ç ¶óÀ̺귯¸®¸¦ »ç¿ëÇÏ¸é ¸î ¹øÀÇ ÇÔ¼ö È£Ãâ·Î ½º·¹µå Ç®À» °£´ÜÈ÷ Àû¿ëÇÒ ¼ö ÀÖ´Ù. {code} void *thrd_func( int *client_sock );

int main( void ) {
thpool *th_pool; // ½º·¹µå Ç®¿¡ »ç¿ëµÇ´Â º¯¼ö

...

threadpool_init( &th_pool, 7, 64 ); // ½º·¹µå Ç®¿¡ »ç¿ëµÇ´Â ÃʱâÈ­ ÇÔ¼ö

while(1) {
cli_sfd = accept(..);

threadpool_add_work( th_pool, (void *)thrd_func, &cli_sfd ); // ½º·¹µå Ç®¿¡ »ç¿ëµÇ´Â ÀÛ¾÷ Ãß°¡ ÇÔ¼ö

if( somecheck ) {
threadpool_adjust( &th_pool, 5, 32 ); // ½º·¹µå Ç®¿¡ »ç¿ëµÇ´Â Ç® Á¤º¸ Á¶Àý ÇÔ¼ö
}
} threadpool_destroy( th_pool ); // ½º·¹µå Ç®¿¡ »ç¿ëµÇ´Â ¼Ò¸ê ÇÔ¼ö

return 0;
} {code}



sponsored by andamiro
sponsored by cdnetworks
sponsored by HP

Valid XHTML 1.0! Valid CSS! powered by MoniWiki
last modified 2011-01-31 23:15:59
Processing time 0.0071 sec