/*
 * vsib2udp_client
 * (C) Jan Wagner 05/12/09
 *
 * Receives UDP packets sent by 'vsib2udp' or forwarded
 * from 'vsib2multicast' using 'vsib2multicast_recast'.
 *
 * The capture starting time stamp has to be specified.
 * It is compared to the seconds field in the UDP stream.
 * Data is recorded beginning from this time stamp and
 * continues for a certain specified amount of seconds.
 *
 * The UDP packets have an extra user header:
 * [ 32bit Unix time stamp | 32bit packet sequence nr | vsib data ]
 *
 * The values are in network byte order.
 * The packet sequence number runs continuously, even over
 * the one-second boundaries where the time stamp changes.
 *
 * The Unix time stamp is based on the NTP time on the
 * server when the first packet was sent. Subsequent
 * time stamps have been derived from the detected
 * data rate. 
 *
 * To get the definitive time stamp values you should
 * still parse the VSIB data and find the embedded
 * time stamps in that data (usually Mk4/VLBA).
 *
 */

#ifndef _FILE_OFFSET_BITS
#define _FILE_OFFSET_BITS 64
#endif
#ifndef _LARGEFILE64_SOURCE
#define _LARGEFILE64_SOURCE 1
#endif

#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>

#include <netdb.h>
#include <time.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <malloc.h>
#include <fcntl.h>
#include <signal.h>

#define __USE_GNU
#include <sched.h>
#undef __USE_GNU
#include <pthread.h>
#include <limits.h>

#define VLBA_FRAME_SIZE     80000
#define UDP_USER_HEADERLEN  (2*sizeof(u_int32_t))

static void realtime_inits();
static void SIGINT_handler(int);
static int  connect_udp_listener(int port);

static volatile sig_atomic_t m_caught_sigint = 0;

int main(int argc, char *argv[])
{
    struct sigaction sa;

    int fd_so = -1;
    int fd_file = -1;
    int port = -1;
    char* udpbuffer;
    char* filename;
    int do_resynch = 1;

    u_int32_t packets_within_sec = 0;
    u_int32_t bytes_within_sec = 0;
    u_int32_t previous_sec = 0;
    u_int32_t first_psn = 0;
  
    u_int32_t starting_second;
    u_int32_t final_second;

    /* About */
    printf("VSIB UDP client tool 'vsib2udp_client' v1.00\n");
    printf("(C) 2009 Jan Wagner, Metsahovi Radio Observatory\n\n");
    if (argc <= 4) {
        printf("Usage: ./vsib2udp_client <source_port> <start_second> <duration_secs> <outputfile>\n"
               "  source_port   : the port on which to receive data\n"
               "  start_second  : start to record from which data second\n"
               "  duration_secs : how many data seconds to record\n"
               "  outputfile    : name and path of the output file\n\n"
        );
        return 0;
    }

    /* Params */
    port            = atoi(argv[1]);
    starting_second = atoll(argv[2]);
    final_second    = starting_second + atoll(argv[3]) - 1;
    filename        = argv[4];

    /* Register new SIGINT handler */
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sa.sa_handler = SIGINT_handler;
    sigaction(SIGINT, &sa, 0);

    /* Preparations */
    realtime_inits();
    udpbuffer = memalign(128, 65535);
    fd_file = open(filename, O_CREAT|O_WRONLY/*|O_TRUNC*/, S_IWUSR|S_IRUSR|S_IROTH|S_IRGRP|S_IWGRP);
    if (fd_file < 0) {
        printf("Could not create output file %s\n", filename); perror(" ");
        return 1;
    }

    /* Open a UDP-receiving socket */
    fd_so = connect_udp_listener(port);

    /* Receive loop */
    while (!m_caught_sigint) {
        ssize_t    nbytes;
        u_int32_t  psn, timestamp;
        u_int32_t* header  = (u_int32_t*)udpbuffer;
        char*      payload = udpbuffer + UDP_USER_HEADERLEN;
        static char recording_flag = 'n';

        /* Receive more data */
        nbytes = recv(fd_so, udpbuffer, 65535, 0);
        if (nbytes < 0) {
             perror("recvfrom");
             break;
        }

        /* Check the header, re-synch to first 1PPS change */
        timestamp = ntohl(header[0]);
        psn = ntohl(header[1]);
        if (do_resynch) {
            if (timestamp <= previous_sec) { continue; }
            do_resynch = 0;
        }

        /* Determine if data should be recorded */
        if (timestamp >= starting_second) { 
            if (timestamp <= final_second) {
                if (recording_flag == 'n') { first_psn = psn; }
                recording_flag = 'y';
            } else {
                printf("The last second has been recorded.\n");
                break;
            }
        }

        /* Just for statistics */
        if (timestamp > previous_sec) {
            float frames_within_sec = bytes_within_sec / (float)VLBA_FRAME_SIZE;
            fprintf(stderr, "Second %u; packets-per-sec %u, frames-per-sec %f, rate %5.2f Mbps : record='%c'\n", 
                    timestamp, packets_within_sec, frames_within_sec, 1e-6*8*bytes_within_sec, recording_flag);
            packets_within_sec = 0;
            bytes_within_sec = 0;
            previous_sec = timestamp;
        }
        packets_within_sec++;
        bytes_within_sec += (nbytes - UDP_USER_HEADERLEN);

        /* Write to output file */
        if (recording_flag == 'y') {
            if (psn >= first_psn) {
                off64_t offset = ((off64_t)nbytes - UDP_USER_HEADERLEN) * (psn - first_psn);
                size_t len = nbytes - UDP_USER_HEADERLEN;
                lseek64(fd_file, offset, SEEK_SET);
                if (write(fd_file, payload, len) < len) {
                    printf("short write\n");
                }
            }
        }
    }

    close(fd_so);
    close(fd_file);
    fprintf(stderr, "KTHX!\n");
    if (m_caught_sigint) { exit(0); }
    return 0;
}


///////////////////////////////////////////////////////////////////
// Tie to a single CPU core, prevent page swapping, boost priority
///////////////////////////////////////////////////////////////////
void realtime_inits()
{
    struct sched_param sp;
    cpu_set_t cpuset;
    int cpu = 0;

    CPU_ZERO(&cpuset);
    CPU_SET(cpu, &cpuset);
    if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) < 0) {
        printf("Warning: could not tie CPU#%d\n", cpu);
        perror("sched_setaffinity");
    } else {
        printf("Program tied to CPU#%d\n", cpu);
    }
    sp.sched_priority = 50; //sched_get_priority_max(SCHED_RR); however, 99 not recommended!
    if (sched_setscheduler(0, SCHED_RR, &sp) < 0) {
        printf("Warning: could not change to Round-Robin scheduler\n");
        perror("sched_setscheduler");
    } else {
        printf("Changed to SCHED_RR with priority %d\n", sp.sched_priority);
    }

    if (mlockall(MCL_FUTURE | MCL_CURRENT)) {
        printf("Warning: mlockall() failed (not running as root?) with error\n");
        perror("mlockall");
    } else {
        printf("Locked future allocs with mlockall()\n");
    }

    mallopt(M_TRIM_THRESHOLD, -1);
    mallopt(M_MMAP_MAX, 0);
    fflush(stdout);
}


///////////////////////////////////////////////////////////////////
// Control-C / SIGINT handler
///////////////////////////////////////////////////////////////////
void SIGINT_handler(int signum)
{
    m_caught_sigint = 1;
}


///////////////////////////////////////////////////////////////////
// Listen on the specified port for UDP
///////////////////////////////////////////////////////////////////
int connect_udp_listener(int port)
{
    struct addrinfo  hints;
    struct addrinfo *info;
    struct addrinfo *info_save;
    char buffer[10];
    int  fd, xport=-1;

    /* set up the hints for getaddrinfo() */
    memset(&hints, 0, sizeof(hints));
    hints.ai_flags    = AI_PASSIVE;
    hints.ai_family   = AF_INET;
    hints.ai_socktype = SOCK_DGRAM;

    /* try to get address info for ourselves */
    sprintf(buffer, "%d", port);
    if (getaddrinfo(NULL, buffer, &hints, &info)) {
        printf("Error in getting address information");
        exit(1);
    }

    /* go through all address structures */
    info_save = info;
    do {
        int optval = 16*1024*1024;
        if ((fd = socket(info->ai_family, info->ai_socktype, info->ai_protocol)) < 0) {
            continue;
        }
        setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval));
        if (bind(fd, info->ai_addr, info->ai_addrlen) == 0) {
            xport = ntohs(((struct sockaddr_in*)info->ai_addr)->sin_port);
            fprintf(stderr, "Receiving data on UDP port %d\n", xport);
            break;
        }   
    } while ((info = info->ai_next) != NULL);

    /* done */
    freeaddrinfo(info_save);
    if (xport == -1) {
        printf("Couldn't find interfaces or couldn't bind port %d\n", port);
        exit(1);
    }
    return fd;
}


