/*
 * vsib2multicast_recast
 * (C) Jan Wagner 05/16/09
 *
 * Joins a IP multicast group and attempts
 * to receive the packets sent by vsib2multicast.
 * The packets are then forwarded as unicast to a new IP.
 */

#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>

#include <time.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <malloc.h>
#include <fcntl.h>
#include <signal.h>

#define __USE_GNU
#include <sched.h>
#undef __USE_GNU
#include <pthread.h>
#include <limits.h>

#define VLBA_FRAME_SIZE     80000
#define UDP_USER_HEADERLEN  (2*sizeof(u_int32_t))

static void realtime_inits();
static void SIGINT_handler(int);
static int  connect_to_multicast(char* group_ip, int group_port);
static int  open_unicast_socket(char* dest_ip, int dest_port, struct sockaddr_in* addr);
static void disconnect_from_multicast(int, char*);

static volatile sig_atomic_t m_caught_sigint = 0;

int main(int argc, char *argv[])
{
    struct sigaction sa;
    struct sockaddr_in addr_fwd;

    int fd_so = -1;
    int fd_udp = -1;
    char* udpbuffer;
    int do_resynch = 1;

    u_int32_t packets_within_sec = 0;
    u_int32_t bytes_within_sec = 0;
    u_int32_t previous_sec = 0;

    /* About */
    printf("VSIB multicast recasting tool 'vsib2multicast_recast' v1.00\n");
    printf("(C) 2009 Jan Wagner, Metsahovi Radio Observatory\n\n");
    if (argc <= 4) {
        printf("Usage: ./vsib2multicast_recast <group_ip> <group_port> <destination_ip> <destination_port>\n"
               "  group_ip   : the multicast group IP between 224.0.0.0-239.255.255.255\n"
               "  group_port : the multicast input port number\n"
               "  dest_ip    : the unicast host destination IP\n"
               "  dest_port  : the unicast host destination port\n"
               "Joins the specified multicast group, forwards to unicast IP.\n\n"
        );
        exit(1);
    }

    /* Register new SIGINT handler */
    sigemptyset(&sa.sa_mask);
    sa.sa_flags = 0;
    sa.sa_handler = SIGINT_handler;
    sigaction(SIGINT, &sa, 0);

    /* Preparations */
    realtime_inits();
    udpbuffer = memalign(128, 65535);

    /* Open a client socket to the multicast group */
    fd_so = connect_to_multicast(argv[1], atoi(argv[2]));

    /* Open a socket for sending UDP */
    fd_udp = open_unicast_socket(argv[3], atoi(argv[4]), &addr_fwd);

    /* Receive loop */
    while (!m_caught_sigint) {
        struct sockaddr_in addr;
        socklen_t addrlen;
        int nbytes;
        u_int32_t  psn, timestamp;
        u_int32_t* header  = (u_int32_t*)udpbuffer;
        char*      payload = udpbuffer + UDP_USER_HEADERLEN;

        /* Receive more data */
        addrlen = sizeof(addr);
        nbytes = recvfrom(fd_so, udpbuffer, 65535, 0, (struct sockaddr*)&addr, &addrlen);
        if (nbytes < 0) {
             perror("recvfrom");
             break;
        }

        /* Check the header, re-synch to first 1PPS change */
        timestamp = ntohl(header[0]);
        psn = ntohl(header[1]);
        if (do_resynch) {
            if (timestamp <= previous_sec) { continue; }
            do_resynch = 0;
        }

        /* Forward as unicast UDP to destination */
        if (sendto(fd_udp, udpbuffer, nbytes, 0, (struct sockaddr *)&addr_fwd, sizeof(addr_fwd)) < 0)
        {
             perror("sendto");
             break;
        }

        /* Just for statistics */
        if (timestamp > previous_sec) {
            float frames_within_sec = bytes_within_sec / (float)VLBA_FRAME_SIZE;
            fprintf(stderr, "Seconds changed to %u; packets-per-sec %u, frames-per-sec %f, rate %5.2f Mbps\n", 
                    timestamp, packets_within_sec, frames_within_sec, 1e-6*8*bytes_within_sec);
            packets_within_sec = 0;
            bytes_within_sec = 0;
            previous_sec = timestamp;
        }
        packets_within_sec++;
        bytes_within_sec += (nbytes - UDP_USER_HEADERLEN);
    }

    disconnect_from_multicast(fd_so, argv[1]);
    fprintf(stderr, "KTHX!\n");
    return 0;
}


///////////////////////////////////////////////////////////////////
// Tie to a single CPU core, prevent page swapping, boost priority
///////////////////////////////////////////////////////////////////
void realtime_inits()
{
    struct sched_param sp;
    cpu_set_t cpuset;
    int cpu = 1;

    CPU_ZERO(&cpuset);
    CPU_SET(cpu, &cpuset);
    if (sched_setaffinity(0, sizeof(cpu_set_t), &cpuset) < 0) {
        printf("Warning: could not tie CPU#%d\n", cpu);
        perror("sched_setaffinity");
    } else {
        printf("Program tied to CPU#%d\n", cpu);
    }
    sp.sched_priority = 50; //sched_get_priority_max(SCHED_RR); however, 99 not recommended!
    if (sched_setscheduler(0, SCHED_RR, &sp) < 0) {
        printf("Warning: could not change to Round-Robin scheduler\n");
        perror("sched_setscheduler");
    } else {
        printf("Changed to SCHED_RR with priority %d\n", sp.sched_priority);
    }

    if (mlockall(MCL_FUTURE | MCL_CURRENT)) {
        printf("Warning: mlockall() failed (not running as root?) with error\n");
        perror("mlockall");
    } else {
        printf("Locked future allocs with mlockall()\n");
    }

    mallopt(M_TRIM_THRESHOLD, -1);
    mallopt(M_MMAP_MAX, 0);
    fflush(stdout);
}


///////////////////////////////////////////////////////////////////
// Control-C / SIGINT handler
///////////////////////////////////////////////////////////////////
void SIGINT_handler(int signum)
{
    m_caught_sigint = 1;
}


///////////////////////////////////////////////////////////////////
// Join the specified multicast group
///////////////////////////////////////////////////////////////////
int connect_to_multicast(char* group_ip, int group_port)
{
    struct sockaddr_in addr;
    struct ip_mreq mreq;
    int fd;

    /* Create ordinary UDP socket */
    if ((fd=socket(AF_INET,SOCK_DGRAM,0)) < 0) {
        perror("socket");
        exit(1);
    }

    /* Bind to port */
    memset(&addr, 0, sizeof(addr));
    addr.sin_family      = AF_INET;
    addr.sin_addr.s_addr = htonl(INADDR_ANY);
    addr.sin_port        = htons(group_port);
    if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("bind to all interfaces");
        exit(1);
    }

    /* Request that the kernel join a multicast group */
    mreq.imr_multiaddr.s_addr = inet_addr(group_ip);
    mreq.imr_interface.s_addr = htonl(INADDR_ANY);
    if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
        perror("setsockopt add multicast membership");
        exit(1);
    }

    return fd;
}


///////////////////////////////////////////////////////////////////
// Cleanly leave a multicast group
///////////////////////////////////////////////////////////////////
void disconnect_from_multicast(int fd, char* group_ip)
{
    struct ip_mreq mreq;
    mreq.imr_multiaddr.s_addr = inet_addr(group_ip);
    mreq.imr_interface.s_addr = htonl(INADDR_ANY);
    if (setsockopt(fd, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
        perror("setsockopt drop multicast membership");
    }
    close(fd);
}


///////////////////////////////////////////////////////////////////
// Open a socket and prepare UDP destination
///////////////////////////////////////////////////////////////////
int open_unicast_socket(char* dest_ip, int dest_port, struct sockaddr_in* addr)
{
    int fd = -1;
    int optval;
    socklen_t optlen;
    memset(addr, 0, sizeof(struct sockaddr_in));
    if ((fd=socket(AF_INET,SOCK_DGRAM,0)) < 0) {
        perror("failed to create sender socket");
        exit(1);
    }
    optval = 16*1024*1024;
    setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval));
    getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &optval, &optlen);
    addr->sin_family      = AF_INET;
    addr->sin_addr.s_addr = inet_addr(dest_ip);
    addr->sin_port        = htons(dest_port);
    printf("Forwarding multicast UDP to: %s port %d, buffer size %d bytes\n", dest_ip, dest_port, optval);
    return fd;
}

