# -*- coding: utf-8 -*-
"""
Primary send functions for ska-sdp-cbf-emulator
"""
import asyncio
import configparser
import logging
import time
from pathlib import Path
from typing import Union
from realtime.receive.core import icd, msutils
from realtime.receive.core.baseline_utils import baselines
from cbf_sdp import transmitters
logger = logging.getLogger(__name__)
def add_missing_config_groups(config):
if "transmission" not in config:
config["transmission"] = {}
if "reader" not in config:
config["reader"] = {}
if "payload" not in config:
config["payload"] = {}
[docs]async def packetise(
config: configparser.ConfigParser,
ms: Union[msutils.MeasurementSet, Path, str],
):
"""
Reads data off a Measurement Set and transmits it using the transmitter
specified in the configuration.
Uses the vis_reader get data from the measurement set then gives it to the
transmitter for packaging and transmission. This code is transmission
protocol agnostic.
"""
add_missing_config_groups(config)
if not isinstance(ms, msutils.MeasurementSet):
ms = msutils.MeasurementSet.open(str(ms))
num_stations = ms.num_stations
num_baselines = msutils.calc_baselines(ms)
if num_baselines == baselines(num_stations, False):
logger.warning("Baseline count indicates AUTO are not present")
elif num_baselines == baselines(num_stations, True):
logger.info("AUTOs present")
transmission = config["transmission"]
start_chan = config["reader"].getint(
"start_chan", 0
) # start channel to read
num_chan = config["reader"].getint(
"num_chan", 0
) # number of constigeous channels to read, 0 -> all
num_repeats = config["reader"].getint(
"num_repeats", 1
) # number of times to repeat data reading
num_timestamps = config["reader"].getint(
"num_timestamps", 0
) # number of timestamps to read, 0 -> all
scan_id = transmission.getint(
"scan_id", 1
) # the scan id to use for all payloads
chan_per_stream = transmission.getint(
"channels_per_stream", 0
) # channels per stream, 0 -> 1 stream
time_interval = transmission.getfloat("time_interval", 0)
num_chan = msutils.clamp_num_chan(ms, start_chan, num_chan)
transmission["total_channels"] = str(num_chan)
logger.info(f"scan id : {scan_id}")
logger.info(f"no. stations : {num_stations}")
logger.info(f"no. baselines : {num_baselines}")
logger.info(f"no. channels : {num_chan}")
logger.info(f"first channel : {start_chan}")
logger.info(f"channels per stream : {chan_per_stream} (0 == all)")
logger.info(
f"time interval : {time_interval} "
"(0 == as per MS, <0 == fly through)"
)
logger.info(f"no. repeats : {num_repeats}")
if num_repeats <= 0:
raise ValueError(f"num_repeats must be > 0: {num_repeats}")
if num_timestamps == 1:
if num_repeats > 1:
raise ValueError(
"repeating a single timestamp is not recommended, "
"increase timestamps to at least 2"
)
# Time interval calculations (fixed, MS-driven, none)
if time_interval > 0:
def intervals():
while True:
yield time_interval
elif time_interval == 0:
def intervals():
interval = 0
prev_vis_time = yield
yield 0
while True:
vis_time = yield
if vis_time < prev_vis_time:
yield interval
else:
interval = vis_time - prev_vis_time
yield interval
prev_vis_time = vis_time
else:
def intervals():
while True:
yield 0
# prime coroutine-like generator
intervals = intervals()
next(intervals)
# Iterate over timesteps in the data
transmitter = await transmitters.create(
transmission, num_baselines, num_chan
)
start_time = time.time()
async with transmitter:
repeat_count = 0
for repeat_count in range(0, num_repeats):
# FIXME: the vis_reader is not reading the weights / flags
# and passing it on
vis_reader = msutils.vis_reader(
ms,
start_chan=start_chan,
num_chan=num_chan,
num_timestamps=num_timestamps,
timestamp_offset=repeat_count,
)
prev_send_start = time.time()
async for vis_amps, ts, ts_fraction in vis_reader:
# Gets interval value to emulate, adjust to remove
# runtime overhead
waiting_time = intervals.send(icd.icd_to_unix(ts, ts_fraction))
next(intervals)
if waiting_time > 0:
waiting_time -= time.time() - prev_send_start
if waiting_time > 0:
await asyncio.sleep(waiting_time)
prev_send_start = time.time()
await transmitter.send(scan_id, ts, ts_fraction, vis_amps)
# Print time taken.
duration = time.time() - start_time
data_size = transmitter.bytes_sent / 1024 / 1024
logger.info(
"Scan %s sent %.3f [MB] in %.3f [s] (%.3f [MB/s], %.3f [heaps/s])",
scan_id,
data_size,
duration,
(data_size / duration),
transmitter.heaps_sent / duration,
)
return transmitter.heaps_sent