| Aspect | Where | Lines |
|---|---|---|
| control_sender.xsl | sender | 117 |
| sla_sender.xsl | sender | 73 |
| sender.xsl | sender | 30 |
| control_receiver.xsl | receiver | 125 |
| timing.xsl | receiver | 50 |
| cpumon.xsl | receiver | 14 |
| sla_receiver.xsl | receiver | 55 |
| receiver.xsl | receiver | 18 |
| TOTAL | 482 |
|
all:: sender receiver sender:: @echo Building sender cd sender; make; cd .. receiver:: @echo Building receiver cd receiver; make; cd .. CC = gcc LD = gcc all:: post-all #call comm layer make files to build ports OBJS := $(OBJS) ppmOut.o ppmOut.o: ppmOut.c $(CC) -Wall -g -c ppmOut.c #build the body of the pipe sender.o: sender.c $(CC) -Wall -g -c sender.c #build the C library runtime.o: sender.c $(CC) -Wall -g -c runtime.c OBJS := $(OBJS) sla_sender.o sla_sender.o: sla.c $(CC) -Wall -g -o sla_sender.o -c sla.c OBJS := $(OBJS) control_sender.o LDFLAGS := $(LDFLAGS) -lpthread control_sender.o: control.c $(CC) -Wall -g -o control_sender.o -c control.c #linksender: sender.o runtime.o ${OBJS} # ar rcs libsender.a sender.o ppmOut.o ppmIn.o ${LD} -o $@ sender.o runtime.o ${OBJS} ${LDFLAGS} clean: rm -f sender.o $(OBJS) runtime.o post-all: sender.o ppmOut.o ${OBJS} runtime.o #ifndef CRUNTIMEINCLUDED #define CRUNTIMEINCLUDED #define PUBLISH_FILE 0x00000001 int lookup(char *pipepath, char *pipe, char *portName, int publish_where, char **str); int publish(char *pipepath, char *pipe, char *portName, char *str, int type); int unpublish(char *pipepath, char *pipe, char *portname, int type); #endif #include <string.h> #include <stdio.h> #include <stdlib.h> #include <sys/stat.h> #include <errno.h> #include "runtime.h" static int makePathDir(char *, mode_t ); static int lookupFile(char *pipepath, char *pipe, char *portname, char **str); static int publishFile(char *pipepath, char *pipe, char *portname, char *str); static int unpublishFile(char *pipepath, char *pipe, char *portname); int lookup(char *pipepath, char *pipe, char *portname, int publish_where, char **str) { if (publish_where & PUBLISH_FILE) { return lookupFile(pipepath, pipe, portname, str); } fprintf(stderr,"LOOKUP TYPE UNKNOWN."); return 8; } static int lookupFile(char *pipepath, char *pipe, char *portname, char **str) { char *publishRoot; char *filename; FILE *ifile; struct stat filestat; if((publishRoot = getenv("INFOPIPE_REPOSITORY")) == NULL) { publishRoot = getenv("PWD"); } filename = (char *)malloc(sizeof(char) * (strlen(publishRoot) + strlen("running/") + strlen(pipepath) + strlen(pipe) + strlen(portname) + 10) ); filename[0] = 0; strcat(filename, publishRoot); if( filename[strlen(publishRoot)-1] != '/') strcat(filename, "/"); strcat( filename, "running/"); strcat( filename, pipepath); if( filename[ strlen(filename) - 1 ] != '/' ) strcat( filename, "/"); strcat( filename, pipe); strcat( filename, "/"); strcat( filename, portname); fprintf(stdout, "Trying to read '%s'.\n", filename); if( stat( filename, &filestat ) ) { fprintf(stderr, "'%s' did not work as a file.", filename ); return 8; } fprintf(stdout, "file size is %ld.\n", filestat.st_size ); *str = (char *) malloc(filestat.st_size * sizeof(char) + 20); if( !str ) { fprintf(stderr, "lookupFile() malloc() failed.\n" ); } ifile = fopen( filename, "r"); if(ifile && !feof(ifile)) { fgets(*str, filestat.st_size + 1, ifile); } else { fprintf(stderr,"Error in reading file '%s'.\n", filename); return 3; } return 0; } int publish(char *pipepath, char *pipe, char *portname, char *str, int type) { if(type & PUBLISH_FILE) { publishFile(pipepath, pipe, portname, str); } else { fprintf(stderr, "You want me to publish how??? I have no idea what you're talking about, bub."); } return 0; } static int publishFile(char *pipepath, char *pipe, char *portname, char *str) { char *publishRoot; char *filename; FILE *ofile; if((publishRoot = getenv("INFOPIPE_REPOSITORY")) == NULL) { publishRoot = getenv("PWD"); } filename = (char *)malloc( sizeof(char) * (strlen(publishRoot) + strlen("running/") + strlen(pipepath) + strlen(pipe) + strlen(portname) + 10)); filename[0] = 0; strcat(filename, publishRoot); if (filename[strlen(publishRoot)-1] != '/') strcat(filename, "/"); strcat(filename,"running/"); strcat(filename, pipepath); strcat(filename, "/"); strcat(filename, pipe); strcat(filename, "/"); strcat(filename, portname); makePathDir(filename,0); printf("publishing file = %s\n", filename); ofile = fopen(filename,"w"); if(ofile) { fprintf(ofile,"%s\n",str); } else { printf("Error in publishing file %s\n", filename); } fclose(ofile); return 0; } int unpublish(char *pipepath, char *pipe, char *portname, int type) { if (type & PUBLISH_FILE) { unpublishFile(pipepath, pipe, portname); } return 0; } static int unpublishFile(char *pipepath, char *pipe, char *portname) { char *publishRoot; char *filename; if ((publishRoot = getenv("INFOPIPE_REPOSITORY")) == NULL) { publishRoot = getenv("PWD"); } filename = (char *) malloc(sizeof(char) * (strlen(publishRoot) + strlen(pipepath) + strlen(pipe) + strlen(portname) + 10)); filename[0] = 0; strcat(filename, publishRoot); if (filename[strlen(publishRoot)-1] != '/') strcat(filename, "/"); strcat(filename, pipepath); strcat(filename, "/"); strcat(filename, pipe); strcat(filename, "/"); strcat(filename, portname); return remove(filename); } static int makePathDir(char *path, mode_t mode) { char *dir; char *p; mode = 0777; dir = (char *)malloc(sizeof(char) * (strlen(path) + 1)); p = path; while(1) { p = strchr(p, '/'); if (p == NULL) { free( dir ); return 0; } p++; memcpy(dir, path, p - path); dir[p - path] = 0; if (mkdir(dir, mode)) { if (errno != EEXIST) { free(dir); return -1; } } } return 0; } #ifndef INFOPIPEsenderDATATYPES #define INFOPIPEsenderDATATYPES #include <stdlib.h> #include <stdio.h> typedef struct ppmStructDef { int width; // an infopipe integer // int height; // an infopipe integer // int maxval; // an infopipe integer // int data_size; // an infopipe integer // char* data; // an infopipe array of byte // } ppmStruct, * ppmStruct_ptr; int sender(); int infopipe_sender_startup(); int infopipe_sender_shutdown(); #endif // INFOPIPEsenderDATATYPES #include "sender.h" #include "ppmOut.h" | int sender( ) { ; // USER DECLARES VARS HERE ; // USER CODE GOES HERE return 0; } int infopipe_sender_startup() { infopipe_ppmOut_startup(); return 0; } int infopipe_sender_shutdown() { infopipe_ppmOut_shutdown(); return 0; } int readPPMFile(ppmStruct *image); int main(int argc, char **argv) { int i; infopipe_sender_startup(); for(i=0; i < 150; i++) { readPPMFile(&ppmOut); infopipe_ppmOut_push(); } infopipe_ppmOut_shutdown(); return 0; } int readPPMFile(ppmStruct *image) { char fname[] ="/home/zorn/aspectTest/asp/img0017.ppm"; FILE *fp; char magic[10]; int wd, ht, maxval, size; fp = fopen(fname, "r"); fscanf(fp, "%s %d %d %d ", magic, &wd, &ht, &maxval); size = ht * wd * 3; if(image->data == NULL) image->data = (char *) malloc(sizeof(char) * size); image->height = ht; image->width = wd; image->maxval = maxval; image->data_size = fread((void *) image->data, 1, size, fp); return size; } #ifndef INFOPIPEECHOINCLUDED#define INFOPIPEECHOINCLUDED int infopipe_socket_startup(); int infopipe_socket_shutdown(); #endif // INFOPIPEECHOINCLUDED #include "sender.h" #include <sys/types.h> #include <sys/stat.h> #include <errno.h> int infopipe_socket_startup() { return 0; } int infopipe_socket_shutdown( ) { return 0; } #ifndef INFOPIPEppmOutINCLUDED #define INFOPIPEppmOutINCLUDED #include "sender.h" // datatype defs, structs int infopipe_ppmOut_startup(); int infopipe_ppmOut_shutdown(); int infopipe_ppmOut_push(); int drive(); extern ppmStruct ppmOut; int infopipe_ppmOutdata_resize( int newsize ); #endif // INFOPIPEppmOutINCLUDED #include <sys/param.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> #include <sys/param.h> #include <sys/wait.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <netdb.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include "control.h" pthread_t control_thread_id; extern void *control_thread(void *args); #include "runtime.h" // runtime utility funcs in c#include "sender.h" // datatype defs, structs #include "ppmOut.h" // func prototypes for this port static int ppmOutSocket; ppmStruct ppmOut; int infopipe_ppmOut_data_resize( int newsize ) { void *newspace; fprintf(stdout, "resizing data to %d\n",newsize); newspace = realloc( ppmOut.data, newsize * sizeof(char ) ); if(!newspace) { return 1; } ppmOut.data = (char *)newspace; ppmOut.data_size = newsize; return 0; } static int writeSock( void *data, ssize_t nbytes ) { int ret; ret = write( ppmOutSocket, data, nbytes ); fprintf(stdout, "Writing %d bytes.\n", nbytes ); if( ret == -1 ) { switch(errno) { case EBADF : fprintf(stderr, "EBADF write failed for ppmOut."); break; case EINVAL : fprintf(stderr, "EINVAL write failed for ppmOut."); break; case EFAULT: fprintf(stderr, "EFAULT write failed for ppmOut."); break; case EFBIG: fprintf(stderr, "EFBIG write failed for ppmOut."); break; case EPIPE: fprintf(stderr, "EPIPE write failed for ppmOut."); break; case EAGAIN: fprintf(stderr, "EAGAIN write failed for ppmOut."); break; case EINTR: fprintf(stderr, "EINTR write failed for ppmOut. Trying again."); writeSock( data, nbytes ); break; case ENOSPC: fprintf(stderr, "ENOSPC write failed for ppmOut."); break; case EIO: fprintf(stderr, "EIO write failed for ppmOut."); break; } perror("Died in writeSock"); exit(99); } return ret; } #define writechar( data ) writeSock( data, sizeof(char) ) #define writebyte( data ) writeSock( data, sizeof(char) ) #define writeinteger( data ) writeSock( data, sizeof(int) ) #define writefloat( data ) writeSock( data, sizeof(float) ) #define writedouble( data ) writeSock( data, sizeof(double) ) #define writelong( data ) writeSock( data, sizeof(long) ) #define writestring( data ) writeSock( data, strlen((char *)data) ) #define writearraychar( data, nelem ) writeSock( data, sizeof(char) * nelem ) #define writearraybyte( data, nelem ) writeSock( data, sizeof(char) * nelem ) #define writearrayinteger( data, nelem ) writeSock( data, sizeof(int) * nelem ) #define writearrayfloat( data, nelem ) writeSock( data, sizeof(float) * nelem ) #define writearraydouble( data, nelem ) writeSock( data, sizeof(double) * nelem ) #define writearraylong( data, nelem ) writeSock( data, sizeof(long) * nelem ) int infopipe_ppmOut_push() { writeinteger( &ppmOut.width ); writeinteger( &ppmOut.height ); writeinteger( &ppmOut.maxval ); writeinteger( &ppmOut.data_size ); writearraybyte( ppmOut.data, ppmOut.data_size ); if( sleep_usec ) { usleep( sleep_usec ); } return 0;} int infopipe_ppmOut_startup() { char *conninfo; char *port; int portNum; struct sockaddr_in sin; struct hostent *hptr; if (pthread_create ( &control_thread_id, NULL, control_thread, NULL) !=0 ) { perror("Unable to create control thread"); exit(0); } ppmOut.data = 0; // NULL ptr initiallylookup( "", "aspectTest/receiver", "ppmIn", PUBLISH_FILE, &conninfo ); port = strchr( conninfo, ':' ); portNum = atoi( port + 1 ); *port = 0; // null term end of host name fprintf(stdout, "Connection to %s:%d.\n\n", conninfo, portNum ); ppmOutSocket = socket(PF_INET,SOCK_STREAM,0); sin.sin_family = AF_INET; sin.sin_port = htons(portNum); hptr = gethostbyname( conninfo ); memcpy(&sin.sin_addr.s_addr,hptr->h_addr_list[0],hptr->h_length); if( connect(ppmOutSocket, (struct sockaddr *)&sin, sizeof(sin)) ) { fprintf(stderr, "Unable to connect to aspectTest/receiver:ppmIn\n"); fprintf(stderr, " at location '%s:%d'\n\n", conninfo, portNum); switch(errno) { case EBADF: fprintf(stderr, "EBADF\n"); break; case EFAULT: fprintf(stderr, "EFAULT\n"); break; case ENOTSOCK: fprintf(stderr, "ENOTSOCK\n"); break; case EISCONN: fprintf(stderr, "EISCONN\n"); break; case ECONNREFUSED: fprintf(stderr, "ECONNREFUSED\n"); break; case ETIMEDOUT: fprintf(stderr, "ETIMEDOUT\n"); break; case ENETUNREACH: fprintf(stderr, "ENETUNREACH\n"); break; case EADDRINUSE: fprintf(stderr, "EADDRINUSE\n"); break; case EINPROGRESS: fprintf(stderr, "EINPROGRESS\n"); break; case EALREADY: fprintf(stderr, "EALREADY\n"); break; case EAGAIN: fprintf(stderr, "EAGAIN\n"); break; case EAFNOSUPPORT: fprintf(stderr, "EAFNOSUPPORT\n"); break; case EACCES: fprintf(stderr, "EACCES\n"); break; case EPERM: fprintf(stderr, "EPERM\n"); break; } perror("Exiting..."); fprintf(stderr, "\n."); exit(10); } sleep(3); fprintf(stdout,"Submit socket initialized \n"); free( conninfo ); | return ppmOutSocket; } int infopipe_ppmOut_shutdown() { int ret; ret = close( ppmOutSocket ); fprintf(stdout, "We done closed ppmOut."); return 0; } * This was for doing debug with oneTwoTest.xip int drive() { int i; int itag = 999; char cppm1 = 'a'; char cppm2 = 'b'; int iwidth = 640; int iheight = 480; int imaxval = 35567; char *str = "this is a test."; infopipe_sendout_bytebuff_resize( 20 ); fprintf(stdout, "bytebuff stated size is %d\n", sendout.bytebuff_size); infopipe_sendout_description_resize( strlen(str) + 10 ); strcpy(sendout.description, str ); while( 1 ) { sendout.tag = itag; sendout.ppm1 = cppm1; sendout.ppm2 = cppm2; sendout.height = iheight; sendout.width = iwidth; for( i = 0; i < 30; i++ ) sendout.intarr[i] = i; for( i = 0; i < sendout.bytebuff_size; i++ ) sendout.bytebuff[i] = 'a' + (i%('z'-'A')); fprintf(stdout,"Push...\n"); infopipe_sendout_push(); fprintf(stdout," ...done.\n"); sleep(3); } } */ CC = gcc LD = gcc all:: post-all #call comm layer make files to build ports OBJS := $(OBJS) ppmIn.o ppmIn.o: ppmIn.c $(CC) -Wall -g -c ppmIn.c #build the body of the pipe receiver.o: receiver.c $(CC) -Wall -g -c receiver.c #build the C library runtime.o: receiver.c $(CC) -Wall -g -c runtime.c OBJS := $(OBJS) sla_receiver.o sla_receiver.o: sla.c $(CC) -Wall -g -o sla_receiver.o -c sla.c OBJS := $(OBJS) control_receiver.o LDFLAGS := $(LDFLAGS) -lpthread control_receiver.o: control.c $(CC) -Wall -g -o control_receiver.o -c control.c #linkreceiver: receiver.o runtime.o ${OBJS} # ar rcs libreceiver.a receiver.o ppmOut.o ppmIn.o ${LD} -o $@ receiver.o runtime.o ${OBJS} ${LDFLAGS} clean: rm -f receiver.o $(OBJS) runtime.o post-all: receiver.o ppmIn.o ${OBJS} runtime.o #ifndef CRUNTIMEINCLUDED #define CRUNTIMEINCLUDED #define PUBLISH_FILE 0x00000001 int lookup(char *pipepath, char *pipe, char *portName, int publish_where, char **str); int publish(char *pipepath, char *pipe, char *portName, char *str, int type); int unpublish(char *pipepath, char *pipe, char *portname, int type); #endif #include <string.h> #include <stdio.h> #include <stdlib.h> #include <sys/stat.h> #include <errno.h> #include "runtime.h" static int makePathDir(char *, mode_t ); static int lookupFile(char *pipepath, char *pipe, char *portname, char **str); static int publishFile(char *pipepath, char *pipe, char *portname, char *str); static int unpublishFile(char *pipepath, char *pipe, char *portname); int lookup(char *pipepath, char *pipe, char *portname, int publish_where, char **str) { if (publish_where & PUBLISH_FILE) { return lookupFile(pipepath, pipe, portname, str); } fprintf(stderr,"LOOKUP TYPE UNKNOWN."); return 8; } static int lookupFile(char *pipepath, char *pipe, char *portname, char **str) { char *publishRoot; char *filename; FILE *ifile; struct stat filestat; if((publishRoot = getenv("INFOPIPE_REPOSITORY")) == NULL) { publishRoot = getenv("PWD"); } filename = (char *)malloc(sizeof(char) * (strlen(publishRoot) + strlen("running/") + strlen(pipepath) + strlen(pipe) + strlen(portname) + 10) ); filename[0] = 0; strcat(filename, publishRoot); if( filename[strlen(publishRoot)-1] != '/') strcat(filename, "/"); strcat( filename, "running/"); strcat( filename, pipepath); if( filename[ strlen(filename) - 1 ] != '/' ) strcat( filename, "/"); strcat( filename, pipe); strcat( filename, "/"); strcat( filename, portname); fprintf(stdout, "Trying to read '%s'.\n", filename); if( stat( filename, &filestat ) ) { fprintf(stderr, "'%s' did not work as a file.", filename ); return 8; } fprintf(stdout, "file size is %ld.\n", filestat.st_size ); *str = (char *) malloc(filestat.st_size * sizeof(char) + 20); if( !str ) { fprintf(stderr, "lookupFile() malloc() failed.\n" ); } ifile = fopen( filename, "r"); if(ifile && !feof(ifile)) { fgets(*str, filestat.st_size + 1, ifile); } else { fprintf(stderr,"Error in reading file '%s'.\n", filename); return 3; } return 0; } int publish(char *pipepath, char *pipe, char *portname, char *str, int type) { if(type & PUBLISH_FILE) { publishFile(pipepath, pipe, portname, str); } else { fprintf(stderr, "You want me to publish how??? I have no idea what you're talking about, bub."); } return 0; } static int publishFile(char *pipepath, char *pipe, char *portname, char *str) { char *publishRoot; char *filename; FILE *ofile; if((publishRoot = getenv("INFOPIPE_REPOSITORY")) == NULL) { publishRoot = getenv("PWD"); } filename = (char *)malloc( sizeof(char) * (strlen(publishRoot) + strlen("running/") + strlen(pipepath) + strlen(pipe) + strlen(portname) + 10)); filename[0] = 0; strcat(filename, publishRoot); if (filename[strlen(publishRoot)-1] != '/') strcat(filename, "/"); strcat(filename,"running/"); strcat(filename, pipepath); strcat(filename, "/"); strcat(filename, pipe); strcat(filename, "/"); strcat(filename, portname); makePathDir(filename,0); printf("publishing file = %s\n", filename); ofile = fopen(filename,"w"); if(ofile) { fprintf(ofile,"%s\n",str); } else { printf("Error in publishing file %s\n", filename); } fclose(ofile); return 0; } int unpublish(char *pipepath, char *pipe, char *portname, int type) { if (type & PUBLISH_FILE) { unpublishFile(pipepath, pipe, portname); } return 0; } static int unpublishFile(char *pipepath, char *pipe, char *portname) { char *publishRoot; char *filename; if ((publishRoot = getenv("INFOPIPE_REPOSITORY")) == NULL) { publishRoot = getenv("PWD"); } filename = (char *) malloc(sizeof(char) * (strlen(publishRoot) + strlen(pipepath) + strlen(pipe) + strlen(portname) + 10)); filename[0] = 0; strcat(filename, publishRoot); if (filename[strlen(publishRoot)-1] != '/') strcat(filename, "/"); strcat(filename, pipepath); strcat(filename, "/"); strcat(filename, pipe); strcat(filename, "/"); strcat(filename, portname); return remove(filename); } static int makePathDir(char *path, mode_t mode) { char *dir; char *p; mode = 0777; dir = (char *)malloc(sizeof(char) * (strlen(path) + 1)); p = path; while(1) { | p = strchr(p, '/'); if (p == NULL) { free( dir ); return 0; } p++; memcpy(dir, path, p - path); dir[p - path] = 0; if (mkdir(dir, mode)) { if (errno != EEXIST) { free(dir); return -1; } } } return 0; } #ifndef INFOPIPEreceiverDATATYPES #define INFOPIPEreceiverDATATYPES #include <stdlib.h> #include <stdio.h> typedef struct ppmStructDef { int width; // an infopipe integer // int height; // an infopipe integer // int maxval; // an infopipe integer // int data_size; // an infopipe integer // char* data; // an infopipe array of byte // } ppmStruct, * ppmStruct_ptr; int receiver(); int infopipe_receiver_startup(); int infopipe_receiver_shutdown(); #endif // INFOPIPEreceiverDATATYPES #include "receiver.h" #include "ppmIn.h" #include "control.h" #include <sys/time.h> #include <stdio.h> extern long usec_to_port_startup; extern long usec_to_port_shutdown; extern long usec_to_recv; long usec_to_pipe_startup; long usec_to_pipe_shutdown; long usec_to_process; #include <sys/time.h> #include <sys/resource.h> #include <unistd.h> float CPUUsage; static long lastUTimeUse = 0; static long lastSTimeUse = 0; static struct rusage usingNow; #include "sla.h" int receiver( ) {; // USER DECLARES VARS HERE long red=0,green=0,blue=0; int i, width, height; struct timeval base; struct timeval end; gettimeofday(&base,NULL); ; // USER CODE GOES HERE printf("%d %d %d\n", ppmIn.width, ppmIn.height, ppmIn.maxval ); for( i = 0; i < ppmIn.width*ppmIn.height; i+=3 ) { red += ppmIn.data[i]; green += ppmIn.data[i+1]; blue += ppmIn.data[i+2]; } red = red /ppmIn.data_size; green = green/ppmIn.data_size; blue = blue/ppmIn.data_size; printf("%ld %ld %ld\n", red, green, blue); gettimeofday(&end,NULL); usec_to_process = (end.tv_sec - base.tv_sec ) * 1e6 + (end.tv_usec - base.tv_usec); fprintf(stdout,"Time to process: %ld\n", usec_to_process); getrusage( RUSAGE_SELF, &usingNow ); CPUUsage = ((float) usingNow.ru_utime.tv_usec + usingNow.ru_stime.tv_usec - lastUTimeUse + ((float) usingNow.ru_utime.tv_sec + usingNow.ru_stime.tv_sec - lastSTimeUse) * 1.0e6) / (usec_to_recv + usec_to_process); lastUTimeUse = usingNow.ru_utime.tv_usec + usingNow.ru_stime.tv_usec; lastSTimeUse = usingNow.ru_utime.tv_sec + usingNow.ru_stime.tv_sec; fprintf(stdout, "Use pct %0.2f.\n", CPUUsage * 100); processSLA(); return 0;} int infopipe_receiver_startup() { struct timeval base; struct timeval end; gettimeofday(&base,NULL); infopipe_ppmIn_startup(); gettimeofday(&end,NULL); usec_to_pipe_startup = (end.tv_sec - base.tv_sec ) * 1e6 + (end.tv_usec - base.tv_usec); fprintf(stdout,"Time to pipe startup: %ld\n", usec_to_pipe_startup); return 0;} int infopipe_receiver_shutdown() { struct timeval base; struct timeval end; gettimeofday(&base,NULL); infopipe_ppmIn_shutdown(); gettimeofday(&end,NULL); usec_to_pipe_shutdown = (end.tv_sec - base.tv_sec ) * 1e6 + (end.tv_usec - base.tv_usec); fprintf(stdout,"Time to pipe shutdown: %ld\n", usec_to_pipe_shutdown); return 0;} int main() { infopipe_receiver_startup(); drive(); return 0; } #ifndef INFOPIPEECHOINCLUDED#define INFOPIPEECHOINCLUDED int infopipe_socket_startup(); int infopipe_socket_shutdown(); #endif // INFOPIPEECHOINCLUDED #include "receiver.h" #include <sys/types.h> #include <sys/stat.h> #include <errno.h> int infopipe_socket_startup() { return 0; } int infopipe_socket_shutdown( ) { return 0; } #ifndef INFOPIPEppmInINCLUDED #define INFOPIPEppmInINCLUDED #include "receiver.h" // datatype defs, structs int drive(); int infopipe_ppmIn_startup(); int infopipe_ppmIn_shutdown(); void infopipe_ppmIn_receiveloop(); extern ppmStruct ppmIn; int infopipe_ppmIndata_resize( int newsize ); #endif // INFOPIPEppmInINCLUDED #include <sys/param.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> #include <sys/param.h> #include <sys/wait.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <netdb.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include "runtime.h" // runtime utility funcs in c #include "ppmIn.h" // func prototypes for this port #include "control.h" #include <sys/time.h> #include <stdio.h> long usec_to_port_startup; long usec_to_port_shutdown; long usec_to_recv; long usec_middle; static int ppmInSocket;ppmStruct ppmIn; static void infopipe_ppmIn_readnet(); static int readSock( void *dest, ssize_t nbytes ) { ssize_t bytesRead, totalBytesRead; bytesRead = 0; for( totalBytesRead = 0; nbytes - totalBytesRead; totalBytesRead += bytesRead) { bytesRead = read( ppmInSocket, dest + totalBytesRead, nbytes - totalBytesRead ); } return 0; } #define readchar( dest ) readSock( dest, sizeof(char)) #define readbyte( dest ) readSock( dest, sizeof(char)) #define readinteger( dest ) readSock( dest, sizeof(int)) #define readfloat( dest ) readSock( dest, sizeof(float) ) #define readdouble( dest ) readSock( dest, sizeof(double) ) #define readlong( dest ) readSock( dest, sizeof(long) ) #define readarraychar( dest, nelem ) readSock( dest, sizeof(char) * nelem ) #define readarraybyte( dest, nelem ) readSock( dest, sizeof(char) * nelem ) #define readarrayinteger( dest, nelem ) readSock( dest, sizeof(int) * nelem ) #define readarrayfloat( dest, nelem ) readSock( dest, sizeof(float) * nelem ) #define readarraydouble( dest, nelem ) readSock( dest, sizeof(double) * nelem ) #define readarraylong( dest, nelem ) readSock( dest, sizeof(long) * nelem) int infopipe_ppmIn_data_resize( int newsize ) { void *newspace; newspace = realloc( ppmIn.data, newsize * sizeof(char ) ); if(!newspace) { return 0; } ppmIn.data = (char *)newspace; ppmIn.data_size = newsize; return 1; } int infopipe_ppmIn_startup() { int listenSock; int ret; struct sockaddr_in sin; socklen_t alen; if (pthread_create ( &control_thread_id, NULL, receiver_control_thread, NULL) !=0 ) { perror("Unable to create control thread"); exit(0); } struct timeval base; struct timeval end; gettimeofday(&base,NULL); ppmIn.data = 0; // NULL ptr initiallylistenSock = socket(PF_INET, SOCK_STREAM, 0); sin.sin_family = AF_INET; sin.sin_addr.s_addr = htonl(INADDR_ANY); // accept anyone sin.sin_port = 0; // let the OS assign a port to avoid conflicts/old sockets b/c we publish the info | ret = bind(listenSock, (struct sockaddr *)&sin, sizeof(sin)); if( ret < 0 ) { fprintf(stderr, "Bind failed for socket."); exit(8); } alen = sizeof( sin ); ret = getsockname(listenSock, (struct sockaddr *)&sin, &alen); if( ret < 0 ) { fprintf(stderr, "Getsockname failed for socket."); exit(8); } fprintf(stdout,"PipeName: %s \n", "receiver"); fprintf(stdout,"Portname: %s \n", "ppmIn"); fprintf(stdout," PID: %d\n",getpid()); fprintf(stdout,"TCP port: %d \n",ntohs(sin.sin_port)); fprintf(stdout,"TCP host: %d \n",ntohl(sin.sin_addr.s_addr)); { char buffer[MAXHOSTNAMELEN+10]; gethostname( buffer, MAXHOSTNAMELEN ); snprintf(buffer+strlen(buffer), 10, ":%d\n", ntohs(sin.sin_port)); publish( "./aspectTest", "receiver", "ppmIn", buffer, PUBLISH_FILE); } listen(listenSock, 1); alen = sizeof(sin); fprintf(stdout, "Waiting for a single connection.\n"); ppmInSocket = accept(listenSock,(struct sockaddr *)&sin, &alen); fprintf(stdout,"Connected... (hopefully to the right pipe!)\n"); close(listenSock); gettimeofday(&end,NULL); usec_to_port_startup = (end.tv_sec - base.tv_sec ) * 1e6 + (end.tv_usec - base.tv_usec); fprintf(stdout,"Time to port startup: %ld\n", usec_to_port_startup); return ppmInSocket;} static void infopipe_ppmIn_readnet() { readinteger( &(ppmIn.width) ); readinteger( &(ppmIn.height) ); readinteger( &(ppmIn.maxval) ); readinteger( &(ppmIn.data_size) ); { ssize_t readlength; int success; readlength = ppmIn.data_size; success = infopipe_ppmIn_data_resize( ppmIn.data_size ); if( !success ) { fprintf( stderr, "./aspectTest/receiver/ppmIn:data:Malloc failed in read.\n" ); } readarraybyte( ppmIn.data, readlength ); } } void infopipe_ppmIn_receiveloop() { while( 1 ) { struct timeval base; struct timeval end; gettimeofday(&base,NULL); infopipe_ppmIn_readnet(); gettimeofday(&end,NULL); usec_to_recv = (end.tv_sec - base.tv_sec ) * 1e6 + (end.tv_usec - base.tv_usec); fprintf(stdout,"Time to recv: %ld\n", usec_to_recv); {int i; fprintf(stdout,"data:\n"); for( i = 0; i < ppmIn.data_size; i++ ) { fprintf(stdout,"%c ", ppmIn.data[i]); if( !((i+1)%10) ) fprintf(stdout, "\n"); } } */ receiver(); } } int infopipe_ppmIn_shutdown() { int ret; struct timeval base; struct timeval end; gettimeofday(&base,NULL); ret = close( ppmInSocket );fprintf(stdout, "We done closed ppmIn"); unpublish( "./aspectTest", "receiver", "ppmIn", PUBLISH_FILE); gettimeofday(&end,NULL); usec_to_port_shutdown = (end.tv_sec - base.tv_sec ) * 1e6 + (end.tv_usec - base.tv_usec); fprintf(stdout,"Time to port shutdown: %ld\n", usec_to_port_shutdown); control_shutdown(); return 0;} int drive() { fprintf(stdout,"Driving.\n"); infopipe_ppmIn_receiveloop(); return 0; } #ifndef __CONTROL_H #define __CONTROL_H 1 typedef struct INFOCP_t { int controlFlag; long octets_following; }CtrlPreamble, *CtrlPreamblePtr; #define TERM 1 // TERMINATE CONTROL SOCKET #include "sla.h" #include "control.h" #endif #include <sys/param.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> #include <sys/param.h> #include <sys/wait.h> #include <sys/time.h> #include <pthread.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <netdb.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include "control.h" #include "runtime.h" // runtime utility funcs in c int controlInSocket; pthread_t control_thread_id; extern int sleep_time; // sleep time in microseconds static int control_thread_startup(); void *control_thread(void *args) { control_thread_startup(); fprintf(stdout, "Control thread created\n"); while(1) { CtrlPreamble ctrlIn; if(recv( controlInSocket, &ctrlIn, sizeof(ctrlIn), MSG_WAITALL) != sizeof(ctrlIn)) { perror("Error reading Control Information\n"); continue; } else { fprintf(stderr,"control.c: Received control message.\n"); } switch( ctrlIn.controlFlag ) { case NOTIFICATION: { rateCmdRecd(); fprintf(stderr, "Control thread receives a rate msg\n"); break; } case TERM: { fprintf(stdout, "Control gets TERM\n"); close(controlInSocket); fprintf(stdout, "Control thread done\n"); pthread_exit(0); break; } default: { fprintf(stderr, "Control thread: what means message code %d?\n", ctrlIn.controlFlag); break; } } } return 0; } int control_thread_startup() { char *conninfo; char *port; int portNum; struct sockaddr_in sin; struct hostent *hptr; lookup( "", "aspectTest/receiver", "Control", PUBLISH_FILE, &conninfo ); port = strchr( conninfo, ':' ); portNum = atoi( port + 1 ); *port = 0; // null term end of host name fprintf(stdout, "Connection to %s:%d.\n\n", conninfo, portNum ); controlInSocket = socket(PF_INET,SOCK_STREAM,0); sin.sin_family = AF_INET; sin.sin_port = htons(portNum); hptr = gethostbyname( conninfo ); memcpy(&sin.sin_addr.s_addr,hptr->h_addr_list[0],hptr->h_length); if( connect(controlInSocket, (struct sockaddr *)&sin, sizeof(sin)) ) { fprintf(stderr, "Unable to connect to aspectTest/receiver:ppmIn\n"); fprintf(stderr, " at location '%s:%d'\n\n", conninfo, portNum); switch(errno) { case EBADF: fprintf(stderr, "EBADF\n"); break; case EFAULT: fprintf(stderr, "EFAULT\n"); break; case ENOTSOCK: fprintf(stderr, "ENOTSOCK\n"); break; case EISCONN: fprintf(stderr, "EISCONN\n"); break; case ECONNREFUSED: fprintf(stderr, "ECONNREFUSED\n"); break; case ETIMEDOUT: fprintf(stderr, "ETIMEDOUT\n"); break; case ENETUNREACH: fprintf(stderr, "ENETUNREACH\n"); break; case EADDRINUSE: fprintf(stderr, "EADDRINUSE\n"); break; case EINPROGRESS: fprintf(stderr, "EINPROGRESS\n"); break; case EALREADY: fprintf(stderr, "EALREADY\n"); break; case EAGAIN: fprintf(stderr, "EAGAIN\n"); break; case EAFNOSUPPORT: fprintf(stderr, "EAFNOSUPPORT\n"); break; case EACCES: fprintf(stderr, "EACCES\n"); break; case EPERM: fprintf(stderr, "EPERM\n"); break; } perror("Exiting..."); fprintf(stderr, "\n."); exit(10); } fprintf(stdout,"Control sender side initialized \n"); free( conninfo ); return controlInSocket; } #ifndef __SLA_H #define __SLA_H 1 typedef enum metricID { cpuusageMetric }metricID_T; #define MAXNAMELEN 64 typedef struct notification_t { char slaname[MAXNAMELEN]; | char sender[MAXNAMELEN]; char slaparam[MAXNAMELEN]; float val; }notificationMsg_T; #define NOTIFICATION 10 #define INFORMATION 11 #define VIOLATION 12 #define BACKTONORMAL 13 enum { VIOLATED, ACCEPTABLE }; #define RATE 2 // Rate flag constant extern long sleep_usec; // micro-secs to sleep after x-mit int rateCmdRecd(); extern int controlInSocket; #endif /* SLA_H */ #include <sys/socket.h> #include <sys/param.h> #include <sys/time.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include "control.h" #include "sla.h" long sleep_usec = 0; // micro-secs to sleep after x-mit int rateCmdRecd() { float usePct; notificationMsg_T nmsg; int nread = recv( controlInSocket, &nmsg, sizeof( notificationMsg_T ), MSG_WAITALL ); usePct = nmsg.val; printf("control send side read %f percent.\n", usePct); if( usePct > ((float)20/100+ 0.05) ) { if( sleep_usec == 0) { sleep_usec = 1e5; // start at 0.1 sec sleep time } else { sleep_usec = sleep_usec * 1.1; } fprintf(stdout,"Slowing down.\n"); } else { if( usePct < ((float)20/100 - 0.05) ) { if( sleep_usec != 0 ) { sleep_usec = sleep_usec / 1.1; if( sleep_usec < .0001 ) sleep_usec = 0; fprintf(stdout,"Speeding up.\n"); } } } return nread; } #ifndef __CONTROL_H #define __CONTROL_H 1 #include <pthread.h> typedef struct INFOCP_t { int controlFlag; long octets_following; }CtrlPreamble, *CtrlPreamblePtr; #define TERM 1 // TERMINATE CONTROL SOCKET extern pthread_t control_thread_id; extern int controlOutSocket; void *receiver_control_thread(void *args); int control_shutdown(); int controlMsg( int which, void *what, size_t len); #endif #include <sys/param.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> #include <sys/param.h> #include <sys/wait.h> #include <sys/time.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <netdb.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include "control.h" #include "runtime.h" // runtime utility funcs in c static int control_thread_startup(); int controlOutSocket; // so named because the control flows back upstream out of this node pthread_t control_thread_id; extern int pkts_sent; void *receiver_control_thread(void *args) { int bytes_read; control_thread_startup(); fprintf(stdout, "Control thread created\n"); while(1) { char buffer[4096]; bytes_read = read( controlOutSocket, buffer, 4096 ); if( bytes_read == 0 ) { fprintf(stderr,"Control thread detects closed socket - exiting.\n\n"); exit(1); } } return 0; } static int control_thread_startup() { int listenSock; int ret; struct sockaddr_in sin; socklen_t alen; listenSock = socket(PF_INET, SOCK_STREAM, 0); sin.sin_family = AF_INET; sin.sin_addr.s_addr = htonl(INADDR_ANY); // accept anyone sin.sin_port = 0; // let the OS assign a port to avoid conflicts/old sockets b/c we publish the info ret = bind(listenSock, (struct sockaddr *)&sin, sizeof(sin)); if( ret < 0 ) { fprintf(stderr, "Bind failed for socket."); exit(8); } alen = sizeof( sin ); ret = getsockname(listenSock, (struct sockaddr *)&sin, &alen); if( ret < 0 ) { fprintf(stderr, "Getsockname failed for socket."); exit(8); } fprintf(stdout,"PipeName: %s \n", "receiver"); fprintf(stdout,"Portname: %s \n", "Control"); fprintf(stdout," PID: %d\n",getpid()); fprintf(stdout,"TCP port: %d \n",ntohs(sin.sin_port)); fprintf(stdout,"TCP host: %d \n",ntohl(sin.sin_addr.s_addr)); { char buffer[MAXHOSTNAMELEN+10]; gethostname( buffer, MAXHOSTNAMELEN ); snprintf(buffer+strlen(buffer), 10, ":%d\n", ntohs(sin.sin_port)); publish( "./aspectTest", "receiver", "Control", buffer, PUBLISH_FILE); } listen(listenSock, 1); alen = sizeof(sin); fprintf(stdout, "Control Thread: Waiting for a single connection.\n"); controlOutSocket = accept(listenSock,(struct sockaddr *)&sin, &alen); fprintf(stdout,"Control Thread Connected... (hopefully to the right pipe!)\n"); close(listenSock); return controlOutSocket; } int controlMsg( int which, void *what, size_t len) { CtrlPreamble cstructOut; cstructOut.controlFlag = which; cstructOut.octets_following = len; if(write( controlOutSocket, &cstructOut, sizeof(cstructOut)) != sizeof(cstructOut)) { perror("Error on ctrl msg write preamble portion"); return -1; } if(write( controlOutSocket, what, len) != len) { perror("Error on ctrl msg write data portion"); return -1; } printf("control receive side wrote %d bytes.\n",len); return 0; } int control_shutdown() { close(controlOutSocket); fprintf(stdout, "Closed The control Socket"); unpublish( "./aspectTest", "receiver", "Control", PUBLISH_FILE); pthread_exit(0); return 0; } #ifndef __SLA_H #define __SLA_H 1 typedef enum metricID { cpuusageMetric, }metricID_T; #define MAXNAMELEN 64 typedef struct notification_t { char slaname[MAXNAMELEN]; char sender[MAXNAMELEN]; char slaparam[MAXNAMELEN]; float val; }notificationMsg_T; #define SLA 2 // Rate flag constant #define NOTIFICATION 10 #define INFORMATION 11 #define VIOLATION 12 #define BACKTONORMAL 13 enum { VIOLATED, ACCEPTABLE }; void processSLA(); #endif /* SLA_H */ #include <sys/socket.h> #include <sys/param.h> #include <sys/time.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include "control.h" #include "sla.h" static int currentOperatingRegion; extern float CPUUsage; void checkCPUUsageSLO (void) { float slap; notificationMsg_T msg; slap = CPUUsage; strncpy(msg.slaname, "UAVDemoSLA", MAXNAMELEN); strncpy(msg.sender, "sender", MAXNAMELEN); strncpy(msg.slaparam, "CPUUsage", MAXNAMELEN); msg.val = slap; fprintf(stdout, " Sending notification.\n"); controlMsg(NOTIFICATION, &msg, sizeof(notificationMsg_T)); } void processSLA () { checkCPUUsageSLO(); } |