This C program is an example of the use of the LAPI active message call:
/*Am.c
**Example Program illustrating use of the LAPI Active Message Call
*/
#include <lapi.h>
#define A_MAX 2
#define I_MAX 10
typedef struct { /* header for active message handler */
compl_hndlr_t *cmpl_hndlr; /* pointer to completion handler */
int uinfo; /* uinfo passed to -- */
} usrhdr_t; /* the completion handler */
volatile int cont=0;
/*
/** Function: The completion handler for the active call.
** This is invoked at the target after all the data
** of the active message send (LAPI_Amsend) call have
** reached the target.
** Parameters: hndl -> pointer to the handle for the LAPI context
** param -> pointer to the user param (specified
** by user in the header handler function)
*/
void
do_get(lapi_handle_t *hndl, void *param) do_get
{
int loop, rc;
int *buf;
buf = (int *) param;
printf("In Completion Handler: Result of AM call\n");
/* Print Updated buffer */
for (loop=0; loop < I_MAX; loop++) {
printf("val[%d] = %d\n", loop, buf[loop]);
}
cont= 1;
}
/*
** Function: User's active messsage header handler. This is invoked
** at the target when the active message first arrives
** at the target
** Parameters: hndl -> pointer to the handle for the LAPI
** context
** uhdr -> pointer to the user header
** uhdrlen -> pointer to the length of the user
** header
** msglen -> pointer to the length of the message
** compl_hndlr -> pointer to the completion handler
** function pointer. This is to be set
** by the user in this function. (CAN
** be NULL)
** saved_info -> pointer to the user_info. This is set
** by the user in this function. This
** parameter is then passed to the
** completion handler when the completion
** handler is invoked.
*/
void *
hdr_hndlr(lapi_handle_t *hndl, void *uhdr, uint *uhdrlen, uint *msglen,
compl_hndlr_t **cmpl_hndlr, void **saved_info)
{
void *buf;
usrhdr_t *vhdr;
printf("In Header Handler\n");
vhdr = (usrhdr_t *) uhdr;
*cmpl_hndlr = (compl_hndlr_t *) vhdr->cmpl_hndlr;
*saved_info = (void *) vhdr->uinfo;
buf = (void *) vhdr->uinfo;
return (buf);
}
int
main(int argc, char **argv) main
{
lapi_handle_t t_hndl; /* LAPI context handle - */
/* returned */
lapi_info_t t_info; /* LAPI info structure */
int task_id, /* My task id */
num_tasks; /* Number of tasks in my */
/* job */
lapi_cntr_t l_cntr; /* Origin counter */
lapi_cntr_t t_cntr; /* Target counter */
lapi_cntr_t c_cntr; /* Completion counter */
int t_buf[I_MAX]; /* Buffer to manipulate */
void *global_addr[A_MAX]; /* Array to store */
/* t_buf addr from all the */
/* tasks. The size of this */
/* array needs to each */
/* number of tasks */
void *tgt_addr[A_MAX]; /* Array to store target */
/* counter addr from all */
/* the tasks. */
void *hndlr_addr[A_MAX]; /* Array to store header */
/* handlers */
void *cmpl_hndlr_addr[A_MAX]; /* Address of */
/* completion handler */
usrhdr_t t_uhdr; /* Store Header Handler */
/* information */
void *uhdr, *udata;
int uhdrlen, udatalen;
int loop, rc, tgt, val, cur_val;
char err_msg_buf[LAPI_MAX_ERR_STRING];
bzero(&t_info, sizeof(lapi_info_t));
t_info.err_hndlr = NULL; /* Not registering error handler */
/* function */
...main
if ((rc = LAPI_Init(&t_hndl, &t_info)) != LAPI_SUCCESS) {
LAPI_Msg_string(rc, err_msg_buf);
printf("Error Message: %s, rc = %d\n", err_msg_buf, rc);
exit (rc);
}
rc = LAPI_Qenv(t_hndl, TASK_ID, &task_id); /* Get task id */
/* within job */
rc = LAPI_Qenv(t_hndl, NUM_TASKS, &num_tasks); /* Get no. of tasks */
/* in job */
if (num_tasks != 2) {
printf("Error Message: Program should run on 2 nodes\n");
exit(1);
}
/* Turn off parameter checking - default is on */
rc = LAPI_Senv(t_hndl, ERROR_CHK, 0);
/* Initialize counters to be zero at the start */
rc = LAPI_Setcntr(t_hndl, &l_cntr, 0);
rc = LAPI_Setcntr(t_hndl, &t_cntr, 0);
rc = LAPI_Setcntr(t_hndl, &c_cntr, 0);
/*
** Exchange buffer address, tgt_cntr address and hdr_hndlr address
** and completion handler address of every task. Collective calls
*/
rc = LAPI_Address_init(t_hndl, t_buf, global_addr);
rc = LAPI_Address_init(t_hndl, &t_cntr, tgt_addr);
rc = LAPI_Address_init(t_hndl, (void *)&hdr_hndlr, hndlr_addr);
rc = LAPI_Address_init(t_hndl, (void *)&do_get, cmpl_hndlr_addr);
if (task_id == 0) { /* Task id is 0 , Origin */
tgt = task_id + 1;
for (loop=0; loop < I_MAX; loop++) { /* Update buffer */
t_buf[loop] = task_id - loop;
}
rc = LAPI_Gfence(t_hndl); /* Global fence to sync before */
/* starting */
/* Fill in uhdr and udata buffers for AM call */
t_uhdr.cmpl_hndlr = (compl_hndlr_t *) cmpl_hndlr_addr[1];
t_uhdr.uinfo = (int)global_addr[tgt];
uhdr = (void *)&t_uhdr;
uhdrlen = sizeof(usrhdr_t);
udata = (void *) t_buf;
udatalen = I_MAX*sizeof(int);
rc = LAPI_Amsend(t_hndl, tgt, hndlr_addr[tgt], uhdr, uhdrlen,
(void *) udata, udatalen, tgt_addr[tgt], &l_cntr, &c_cntr);
/* Wait for local AM completion */
rc = LAPI_Waitcntr(t_hndl, &l_cntr, 1, &cur_val);
/* Can now change local buffer */
for (loop=0; loop < I_MAX; loop++) { /* Update buffer */
t_buf[loop] = loop * task_id; ...main
}
/* Wait for target AM completion */
rc = LAPI_Waitcntr(t_hndl, &c_cntr, 1, &cur_val);
printf("Node %d, done issuing AM to node %d\n", task_id, tgt);
rc = LAPI_Gfence(t_hndl);
rc = LAPI_Get(t_hndl,tgt,I_MAX*sizeof(int), global_addr[tgt],
(void *)t_buf, tgt_addr[tgt],&l_cntr);
/* Wait for local Get completion */
rc = LAPI_Waitcntr(t_hndl, &l_cntr, 1, NULL);
printf("Node %d, done issuing Get from node %d\n", task_id, tgt);
printf("Result of Get after the Am from node %d: \n", tgt);
for (loop=0; loop < I_MAX; loop++) { /* Update buffer */
printf("Val[%d] = %d\n", loop, t_buf[loop]);
}
} else { /* Task id is 1 , Target */
tgt = task_id - 1;
for (loop=0; loop < I_MAX; loop++) { /* Zero out buffer */
t_buf[loop] = 0;
}
rc = LAPI_Gfence(t_hndl); /* Global fence to sync before */
/* starting */
/* Process AM */
rc = LAPI_Getcntr(t_hndl, &t_cntr, &val);
while (val < 1) {
sleep(1); /* Do some work */
rc = LAPI_Probe(t_hndl); /* Poll the adapter once */
rc = LAPI_Getcntr(t_hndl, &t_cntr, &val);
}
/* To reset the t_cntr value */
rc = LAPI_Waitcntr(t_hndl, &t_cntr, 1, &cur_val);
printf("Node %d, done doing work and processing AM\n", task_id);
while (!cont) {
sleep(1); /* Do some work */
}
rc = LAPI_Gfence(t_hndl);
/* Process Get */
rc = LAPI_Getcntr(t_hndl, &t_cntr, &val);
while (val < 1) {
sleep(1); /* Do some work */
rc = LAPI_Probe(t_hndl); /* Poll the adapter once */
rc = LAPI_Getcntr(t_hndl, &t_cntr, &val);
}
/* To reset the t_cntr value */
rc = LAPI_Waitcntr(t_hndl, &t_cntr, 1, &cur_val);
printf("Node %d, done doing work and processing Get\n", task_id);
}
rc = LAPI_Gfence(t_hndl); /* Global fence to sync before */
/* terminating job */
rc = LAPI_Term(t_hndl);
}