Source code for cbf_sdp.packetiser

# -*- coding: utf-8 -*-
"""
Primary send functions for ska-sdp-cbf-emulator
"""

import asyncio
import configparser
import logging
import time
from pathlib import Path
from typing import Union

from realtime.receive.core import icd, msutils
from realtime.receive.core.baseline_utils import baselines

from cbf_sdp import transmitters

logger = logging.getLogger(__name__)


def add_missing_config_groups(config):
    if "transmission" not in config:
        config["transmission"] = {}
    if "reader" not in config:
        config["reader"] = {}
    if "payload" not in config:
        config["payload"] = {}


[docs]async def packetise( config: configparser.ConfigParser, ms: Union[msutils.MeasurementSet, Path, str], ): """ Reads data off a Measurement Set and transmits it using the transmitter specified in the configuration. Uses the vis_reader get data from the measurement set then gives it to the transmitter for packaging and transmission. This code is transmission protocol agnostic. """ add_missing_config_groups(config) if not isinstance(ms, msutils.MeasurementSet): ms = msutils.MeasurementSet.open(str(ms)) num_stations = ms.num_stations num_baselines = msutils.calc_baselines(ms) if num_baselines == baselines(num_stations, False): logger.warning("Baseline count indicates AUTO are not present") elif num_baselines == baselines(num_stations, True): logger.info("AUTOs present") transmission = config["transmission"] start_chan = config["reader"].getint( "start_chan", 0 ) # start channel to read num_chan = config["reader"].getint( "num_chan", 0 ) # number of constigeous channels to read, 0 -> all num_repeats = config["reader"].getint( "num_repeats", 1 ) # number of times to repeat data reading num_timestamps = config["reader"].getint( "num_timestamps", 0 ) # number of timestamps to read, 0 -> all scan_id = transmission.getint( "scan_id", 1 ) # the scan id to use for all payloads chan_per_stream = transmission.getint( "channels_per_stream", 0 ) # channels per stream, 0 -> 1 stream time_interval = transmission.getfloat("time_interval", 0) num_chan = msutils.clamp_num_chan(ms, start_chan, num_chan) transmission["total_channels"] = str(num_chan) logger.info(f"scan id : {scan_id}") logger.info(f"no. stations : {num_stations}") logger.info(f"no. baselines : {num_baselines}") logger.info(f"no. channels : {num_chan}") logger.info(f"first channel : {start_chan}") logger.info(f"channels per stream : {chan_per_stream} (0 == all)") logger.info( f"time interval : {time_interval} " "(0 == as per MS, <0 == fly through)" ) logger.info(f"no. repeats : {num_repeats}") if num_repeats <= 0: raise ValueError(f"num_repeats must be > 0: {num_repeats}") if num_timestamps == 1: if num_repeats > 1: raise ValueError( "repeating a single timestamp is not recommended, " "increase timestamps to at least 2" ) # Time interval calculations (fixed, MS-driven, none) if time_interval > 0: def intervals(): while True: yield time_interval elif time_interval == 0: def intervals(): interval = 0 prev_vis_time = yield yield 0 while True: vis_time = yield if vis_time < prev_vis_time: yield interval else: interval = vis_time - prev_vis_time yield interval prev_vis_time = vis_time else: def intervals(): while True: yield 0 # prime coroutine-like generator intervals = intervals() next(intervals) # Iterate over timesteps in the data transmitter = await transmitters.create( transmission, num_baselines, num_chan ) start_time = time.time() async with transmitter: repeat_count = 0 for repeat_count in range(0, num_repeats): # FIXME: the vis_reader is not reading the weights / flags # and passing it on vis_reader = msutils.vis_reader( ms, start_chan=start_chan, num_chan=num_chan, num_timestamps=num_timestamps, timestamp_offset=repeat_count, ) prev_send_start = time.time() async for vis_amps, ts, ts_fraction in vis_reader: # Gets interval value to emulate, adjust to remove # runtime overhead waiting_time = intervals.send(icd.icd_to_unix(ts, ts_fraction)) next(intervals) if waiting_time > 0: waiting_time -= time.time() - prev_send_start if waiting_time > 0: await asyncio.sleep(waiting_time) prev_send_start = time.time() await transmitter.send(scan_id, ts, ts_fraction, vis_amps) # Print time taken. duration = time.time() - start_time data_size = transmitter.bytes_sent / 1024 / 1024 logger.info( "Scan %s sent %.3f [MB] in %.3f [s] (%.3f [MB/s], %.3f [heaps/s])", scan_id, data_size, duration, (data_size / duration), transmitter.heaps_sent / duration, ) return transmitter.heaps_sent