/*
 * vsib2multicast
 * (C) Jan Wagner 05/12/09
 *
 * Sends VSIB data as a UDP Multicast stream.
 * The transmission is started at the first 1PPS 
 * of the VSIB card.
 *
 * The UDP packets have an extra user header:
 * [ 32bit Unix time stamp | 32bit packet sequence nr | vsib data ]
 *
 * The values are in network byte order.
 * The packet sequence number runs continuously, even over
 * the one-second boundaries where the time stamp changes.
 *
 * The NTP time stamp from the first VSIB 1PPS is used
 * as the base second. The time stamp field is incremented
 * based on the VSIB data rate, not based on gettimeofday().
 *
 * The data rate is detected automatically, 128/256/512/1024.
 *
 */

#define __USE_POSIX
#ifndef _FILE_OFFSET_BITS
#define _FILE_OFFSET_BITS 64
#endif
#ifndef _LARGEFILE64_SOURCE
#define _LARGEFILE64_SOURCE 1
#endif

#include "vsib_ioctl.h"

#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>

#include <time.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <malloc.h>
#include <fcntl.h>
#include <signal.h>
#include <math.h>

#define __USE_GNU
#include <sched.h>
#undef __USE_GNU
#include <pthread.h>
#include <limits.h>

#define MY_VSIB_MODE 0x00

// Choose UDP payload size so that a VSI/VLBA frame divides
// without remainder into several UDP smaller than a 1500 MTU
#define VLBA_FRAME_SIZE     80000
#define UDP_USER_HEADERLEN  (2*sizeof(u_int32_t))
#define UDP_USER_PAYLOADLEN (VLBA_FRAME_SIZE/64) // 1250 bytes

// VSIB default: mode 0, no gigabit, no embed, no skip
#define VSIB_INIT_MODE(mode) (VSIB_MODE_MODE(mode) | VSIB_MODE_RUN | (0 ? VSIB_MODE_GIGABIT : 0) \
              | (0 ? VSIB_MODE_EMBED_1PPS_MARKERS : 0) | (0 & 0x0000ffff))

static void realtime_inits();
static void fullread(const int, char*, const ssize_t);
static int  nearest_pow2(int);
static void SIGINT_handler(int);
static int  vsib_start(int);
static void vsib_stop(int);
static void read_next_listfile_entry(FILE* f, char* scanname, u_int32_t* starttime, u_int32_t* stoptime);

static volatile sig_atomic_t m_caught_sigint = 0;

int main(int argc, char *argv[])
{
    struct sockaddr_in addr;
    struct sigaction sa;
    struct timeval tv;

    char* udpbuffer;
    int port = 0;

    u_int32_t current_second = 0;
    u_int32_t packet_seq_nr = 0;
    u_int32_t recording_start_second = 0;
    u_int32_t recording_last_second = 1; // inclusive, recording ends after this second

    int detected_Mbps = 128;
    u_int32_t packets_per_second = 0;
    u_int32_t packetnr_within_second = 0;

    int fd_vsib = -1;
    int fd_so = -1;
    int fd_outfile = -1;
    int first_pass = 1;
    FILE* fd_lstfile = NULL;
    char  scanfilename[255];

    /* About */
    printf("VSIB data streaming tool 'vsib2multicast' v1.00\n");
    printf("(C) 2009 Jan Wagner, Metsahovi Radio Observatory\n\n");
    if (argc <= 2 || argc>4) {
        printf("Usage: ./vsib2multicast <target_ip> <target_port> [listfile]\n"
               "  target_ip   : some multicast group target ip from 224.0.0.0-239.255.255.255\n"  
               "  target_port : the port number\n"
               "  listfile    : file with list of times to record into current directory\n"
               "You can choose a free IP from http://www.iana.org/assignments/multicast-addresses/\n\n"
        );
        exit(1);
    }

    /* Create ordinary UDP socket and set some multicast configs */
    if ((fd_so=socket(AF_INET,SOCK_DGRAM,0)) < 0) {
        perror("failed to create socket");
        exit(1);
    }
    unsigned char sopt = 1;
    if (setsockopt(fd_so, IPPROTO_IP, IP_MULTICAST_LOOP, &sopt, sizeof(sopt)) < 0) {
        perror("IP_MULTICAST_LOOP");
    }
    sopt = 64;
    if (setsockopt(fd_so, IPPROTO_IP, IP_MULTICAST_TTL, &sopt, sizeof(sopt)) < 0) {
        perror("IP_MULTICAST_LOOP");
    }
    printf("Configured multicast: TTL=%d, loopback=allowed\n", sopt);

    /* Set multicast destination */
    port = atoi(argv[2]);
    memset(&addr,0,sizeof(addr));
    addr.sin_family      = AF_INET;
    addr.sin_addr.s_addr = inet_addr(argv[1]);
    addr.sin_port        = htons(port);

    /* Register new SIGINT handler */
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sa.sa_handler = SIGINT_handler;
    sigaction(SIGINT, &sa, 0);

    /* General preparations */
    packets_per_second = (1e6*detected_Mbps/8)/UDP_USER_PAYLOADLEN;
    udpbuffer = memalign(128, UDP_USER_PAYLOADLEN + UDP_USER_HEADERLEN + 200);
    if (!udpbuffer) {
        perror("64kB alloc failed");
        exit(1);
    }
    realtime_inits();
    if (argc == 4) {
        fd_lstfile = fopen(argv[3], "r");
        if (!fd_lstfile) {
            printf("Could not open the list file %s : ", argv[3]); perror(" ");
            exit(1);
        }
    }
 
    /* Start the VSIB */
    fd_vsib = vsib_start(MY_VSIB_MODE);

    /* Sending loop */
    while (!m_caught_sigint) {
        u_int32_t* header = (u_int32_t*)udpbuffer;

        /* Build UDP packet and header */
        fullread(fd_vsib, udpbuffer+UDP_USER_HEADERLEN, UDP_USER_PAYLOADLEN);
        if (first_pass) {
            first_pass = 0;
            gettimeofday(&tv, NULL);
            current_second = lround((double)tv.tv_sec + 1e-6*tv.tv_usec);
            printf("First packet sent at second %f\n", tv.tv_sec + 1e-6*tv.tv_usec);
        }
        header[0] = htonl(current_second);
        header[1] = htonl(packet_seq_nr);
        packet_seq_nr++;

        /* Write the "file copy" */
        if (fd_lstfile && (recording_last_second != 0)) {
            if (current_second > recording_last_second) {
                close(fd_outfile);
                do {
                    read_next_listfile_entry(fd_lstfile, scanfilename, &recording_start_second, &recording_last_second);
                    printf("Candidate: %s :: %u :: %u\n", scanfilename, recording_start_second, recording_last_second);
                } while ((current_second > recording_last_second) && (recording_last_second != 0));
                printf("Recording scan: %s :: from %u to %u\n", scanfilename, recording_start_second, recording_last_second);
                if (recording_last_second != 0) {
                    fd_outfile = creat(scanfilename, S_IWUSR|S_IRUSR|S_IROTH|S_IRGRP|S_IWGRP);
                    if (fd_outfile < 0) {
                        printf("Could not create output file %s : ", scanfilename); perror(" ");
                    }
                } else {
                    fd_outfile = -1;
                }
            }
            if ((fd_outfile >= 0) && (current_second >= recording_start_second && current_second <= recording_last_second)) {
                ssize_t nwr = write(fd_outfile, udpbuffer+UDP_USER_HEADERLEN, UDP_USER_PAYLOADLEN);
            }
        }

        /* Send to IP multicast group address */
        if (sendto(fd_so, udpbuffer, UDP_USER_HEADERLEN+UDP_USER_PAYLOADLEN, 0, 
                   (struct sockaddr *) &addr, sizeof(addr)) < 0) 
        {
             perror("sendto");
             break;
        }

        /* Seconds timestamp and rate detection */
        packetnr_within_second++;
        if (packetnr_within_second >= packets_per_second) {
            struct timeval tvnow;
            float deltaT, Mbps;
            int pow2Mbps;

            gettimeofday(&tvnow, NULL);
            packetnr_within_second = 0;
            deltaT   = (tvnow.tv_sec - tv.tv_sec) + 1e-6*(tvnow.tv_usec - tv.tv_usec);
            Mbps     = 1e-6 * packets_per_second * 8.0*UDP_USER_PAYLOADLEN / deltaT;
            pow2Mbps = nearest_pow2((int)Mbps);

            /* Rate change: memorize new rate, re-open VSIB, it triggers at next 1PPS */
            if (pow2Mbps != detected_Mbps) {
                printf("Rate change detected: %u Mbps => %5.2f/%u Mbps, dT=%5.4f sec\n", 
                       detected_Mbps, Mbps, pow2Mbps, deltaT);
                detected_Mbps = pow2Mbps;
                packets_per_second = (1e6*detected_Mbps/8)/UDP_USER_PAYLOADLEN;
                first_pass = 1;
                vsib_stop(fd_vsib);
                fd_vsib = vsib_start(MY_VSIB_MODE);
                continue;
            }
            current_second++;
            printf("Second %u, one-sec average rate %5.2f\n", current_second, Mbps);
            tv = tvnow;
        }
    }

    /* Cleanup */
    printf("Exiting...\n");
    close(fd_so);
    vsib_stop(fd_vsib);
    return 0;
}


///////////////////////////////////////////////////////////////////
// Tie to a single CPU core, prevent page swapping, boost priority
///////////////////////////////////////////////////////////////////
void realtime_inits()
{
    struct sched_param sp;
    cpu_set_t cpuset;
    int cpu = 0;

    CPU_ZERO(&cpuset);
    CPU_SET(cpu, &cpuset);
    if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) < 0) {
        printf("Warning: could not tie CPU#%d\n", cpu);
        perror("sched_setaffinity");
    } else {
        printf("Program tied to CPU#%d\n", cpu);
    }
    sp.sched_priority = 50; //sched_get_priority_max(SCHED_RR); however, 99 not recommended!
    if (sched_setscheduler(0, SCHED_RR, &sp) < 0) {
        printf("Warning: could not change to Round-Robin scheduler\n");
        perror("sched_setscheduler");
    } else {
        printf("Changed to SCHED_RR with priority %d\n", sp.sched_priority);
    }

    if (mlockall(MCL_FUTURE | MCL_CURRENT)) {
        printf("Warning: mlockall() failed (not running as root?) with error\n");
        perror("mlockall");
    } else {
        printf("Locked future allocs with mlockall()\n");
    }

    mallopt(M_TRIM_THRESHOLD, -1);
    mallopt(M_MMAP_MAX, 0);
    fflush(stdout);
}


///////////////////////////////////////////////////////////////////
// Control-C / SIGINT handler
///////////////////////////////////////////////////////////////////
void SIGINT_handler(int signum)
{
    m_caught_sigint = 1;
}


///////////////////////////////////////////////////////////////////
// Read n bytes of data from the VSIB or other file
///////////////////////////////////////////////////////////////////
void fullread(const int fd, char* b, const ssize_t n)
{
    ssize_t ndone = 0;
    while (ndone < n) {
        ssize_t nread = read(fd, b+ndone, n-ndone);
        if (nread < 0) {
            perror("read()");
        } else {
            ndone += nread;
        }
    }
}


///////////////////////////////////////////////////////////////////
// Return power-of-2 within 64-1024 that is nearest to given number
///////////////////////////////////////////////////////////////////
int nearest_pow2(int dec)
{
    int pow2 = 64;
    if (dec < 64) return 64;
    while (pow2 < 1024) {
        if (dec < 2*pow2) {
           if ( (2*pow2-dec) < (dec-pow2)) {
               return 2*pow2;
           } else {
               return pow2;
           }
        }
        pow2 *= 2;
    }
    return pow2;
}


///////////////////////////////////////////////////////////////////
// Open the VSIB for reading
///////////////////////////////////////////////////////////////////
int vsib_start(int mode)
{
    int fd = open("/dev/vsib", O_RDONLY);
    if (fd < 0) {
        perror("failed to open /dev/vsib for reading");
        exit(1);
    }
    if (ioctl(fd, VSIB_SET_MODE, VSIB_INIT_MODE(mode))) {
        perror("/dev/vsib ioctl failed");
        exit(1);
    } else {
        printf("VSIB opened with vsib mode 0x%02x, embed=false, giga=false, skip=none\n", mode);
    }
    return fd;
}


///////////////////////////////////////////////////////////////////
// Close the VSIB cleanly
///////////////////////////////////////////////////////////////////
void vsib_stop(int fd)
{
    ioctl(fd, VSIB_DELAYED_STOP_DMA, 0);
    while (1) {
        unsigned long done;
        ioctl(fd, VSIB_IS_DMA_DONE, &done);
        if (done) { break; }
        printf("waiting for VSIB DMA DONE...\n");
        sleep(1);
    }
    ioctl(fd, VSIB_SET_MODE, VSIB_MODE_STOP);
    close(fd);
}


///////////////////////////////////////////////////////////////////
// Parse the next line in the listfile text file 
///////////////////////////////////////////////////////////////////
void read_next_listfile_entry(FILE* f, char* scanname, u_int32_t* starttime, u_int32_t* stoptime)
{
    // Scanlist file format {scanname unix-starttime seconds-duration}:
    // euro99_Mh_145-1200_2009-05-25T12:00:00.evn 1243252800 30
    // euro99_Mh_145-1204_2009-05-25T12:04:09.evn 1243253049 40
    if (!feof(f)) {
        *scanname = '\0';
        *starttime = 0;
        *stoptime = 0;
        fscanf(f, "%s %u %u", scanname, starttime, stoptime); 
        *stoptime += *starttime - 1;
    }
}

