Source code for hexsample.readout

# Copyright (C) 2023--2025 the hexsample team.
#
# For the license terms see the file LICENSE, distributed along with this
# software.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""Readout facilities.
"""

from abc import ABC, abstractmethod
from collections import Counter
from dataclasses import dataclass
from enum import Enum
from typing import Tuple

import numpy as np

from . import rng
from .base import TypeProxy
from .digi import DigiEventBase, DigiEventCircular, DigiEventRectangular
from .hexagon import HexagonalGrid
from .roi import Padding, RegionOfInterest


[docs] class AbstractReadout(ABC): """Abstract base class for a generic pixel readout chip. This is a simple abstract class defining the a single static method read(), along with the associated signature, that all readout chips must implement. """
[docs] @abstractmethod def read(self, timestamp: float, x: np.ndarray, y: np.ndarray, offset: int = 0) -> DigiEventBase: """Readout a single event, given the input coordinates of the charge. Arguments --------- timestamp : float The event timestamp. x : array_like The physical x coordinates of the input charge. y : array_like The physical y coordinates of the input charge. offset : int Optional offset in ADC counts to be applied before the zero suppression. """
[docs] @dataclass class HexagonalReadoutBase(HexagonalGrid, AbstractReadout): """Description of a pixel readout chip on a hexagonal matrix. Note that, in addition to the physical properties (noise and gain) the readout chip comes up at creation time with a well defined configuration, in terms of trigger and zero-suppression thresholds. Arguably, in the future we might add the possibility to change these parameters at runtime to make things more germane to what happens in real life, but for the time being that seems hardly necessary. Also note the readout chip inherits all the members from HexagonalGrid. Arguments --------- enc : float The equivalent noise charge in electrons. gain : float The readout gain in ADC counts per electron (default 1, which means that the PHA you get out are the electrons collected). trg_threshold : float Trigger threshold in electron equivalent (note this is a float because it is expressed in physical space, not in electronics space). zero_sup_threshold : int Zero suppression threshold in ADC counts. """ enc: float = 30. gain: float = 1. trg_threshold: float = 500. zero_sup_threshold: int = 0 def __post_init__(self): """Post-initialization. """ HexagonalGrid.__post_init__(self) self.trigger_id = -1
[docs] @staticmethod def is_odd(value: int) -> bool: """Return whether the input integer is odd. See https://stackoverflow.com/questions/14651025/ for some metrics about the speed of this particular implementation. """ return value & 0x1
[docs] @staticmethod def is_even(value: int) -> bool: """Return whether the input integer is even. """ return not value & 0x1
[docs] @staticmethod def discriminate(array: np.ndarray, threshold: float) -> np.ndarray: """Utility function acting as a simple constant-threshold discriminator over a generic array. This returns a boolean mask with True for all the array elements larger than the threshold. This is intented to avoid possible confusion between strict and loose comparison operators (e.g., < vs <=) when comparing the content of an array with a threshold, and all functions downstream doing this (e.g., zero_suppress) should use this and refrain from re-implementing their own logic. Note this is a static method that can be used interchangeably, e.g., to operate on the pulse height array or the trigger signal array. """ return array > threshold
[docs] @staticmethod def zero_suppress(array: np.ndarray, threshold: float) -> None: """Utility function to zero-suppress a generic array. This is returning an array of the same shape of the input where all the values lower or equal than the zero suppression threshold are set to zero. Note this is a static method that can be used interchangeably, e.g., to operate on the pulse height array or the trigger signal array. Arguments --------- array : array_like The input array. threshold : float The zero suppression threshold. """ mask = np.logical_not(HexagonalReadoutBase.discriminate(array, threshold)) array[mask] = 0
[docs] @staticmethod def latch_timestamp(timestamp: float) -> Tuple[int, int, int]: """Latch the event timestamp and return the corresponding fields of the digi event contribution: seconds, microseconds and livetime. .. warning:: The livetime calculation is not implemented, yet. Arguments --------- timestamp : float The ground-truth event timestamp from the event generator. """ microseconds, seconds = np.modf(timestamp) livetime = 0 return int(seconds), int(1000000 * microseconds), livetime
[docs] def digitize(self, pha: np.ndarray, offset: int = 0) -> np.ndarray: """Digitize the actual signal. Arguments --------- pha : array_like The input array of pixel signals to be digitized. offset : int Optional offset in ADC counts to be applied before the zero suppression. """ # Note that the array type of the input pha argument is not guaranteed, here. # Over the course of the calculation the pha is bound to be a float (the noise # and the gain are floating-point numbere) before it is rounded to the nearest # integer. In order to take advantage of the automatic type casting that # numpy implements in multiplication and addition, we use the pha = pha +/* # over the pha +/*= form. # See https://stackoverflow.com/questions/38673531 # # Add the noise. if self.enc > 0: pha = pha + rng.generator.normal(0., self.enc, size=pha.shape) # ... apply the conversion between electrons and ADC counts... pha = pha * self.gain # ... round to the neirest integer... pha = np.round(pha).astype(int) # ... if necessary, add the offset for diagnostic events... pha += offset # ... zero suppress the thing... self.zero_suppress(pha, self.zero_sup_threshold) # ... flatten the array to simulate the serial readout and return the # array as the BEE would have. return pha.flatten()
[docs] class HexagonalReadoutCircular(HexagonalReadoutBase): """Fixed 7-pixel ROI readout chip on a hexagonal matrix. This class mimics a readout chip with a fixed 7-pixel ROI on a hexagonal matrix, where the maximum PHA is found and the ROI formed by that pixel and its 6 adjacent neighbours. Note events on border will have less than 7 pixels. """ NUM_PIXELS = 7
[docs] def read(self, timestamp: float, x: np.ndarray, y: np.ndarray, offset: int = 0) -> DigiEventCircular: """Overloaded method. """ # pylint: disable=unused-argument # Sample the input positions over the readout... sparse_signal = Counter((col, row) for col, row in zip(*self.world_to_pixel(x, y))) # ...sampling the input position of the highest PHA pixel over the readout... # See: https://stackoverflow.com/questions/70094914/max-on-collections-counter coord_max = max(sparse_signal, key=sparse_signal.get) # col_max, row_max = coord_max #... and converting it in ADC channel coordinates (value from 0 to 6)... adc_max = self.adc_channel(*coord_max) # ... creating a 7-elements array containing the PHA of the ADC channels from 0 to 6 # in increasing order and filling it with PHAs of the highest px and its neigbors... pha = np.empty(self.NUM_PIXELS) pha[adc_max] = sparse_signal[coord_max] # ... identifying the 6 neighbors of the central pixel and saving the signal pixels # prepending the coordinates of the highest one... for coords in self.neighbors(*coord_max): pha[self.adc_channel(*coords)] = sparse_signal[coords] # ...apply the trigger... # Not sure the trigger is needed, the highest px passed # necessarily the trigger or there is no event # trigger_mask = self.discriminate(pha, self.trg_threshold) # .. and digitize the pha values. pha = self.digitize(pha, offset) seconds, microseconds, livetime = self.latch_timestamp(timestamp) # And do not forget to increment the trigger identifier! self.trigger_id += 1 # The pha array is always in the order # [pha(adc0), pha(adc1), pha(adc2), pha(adc3), pha(adc4), pha(adc5), pha(adc6)] return DigiEventCircular(self.trigger_id, seconds, microseconds, livetime, pha, *coord_max)
[docs] @dataclass class HexagonalReadoutRectangular(HexagonalReadoutBase): """ROI-based readout chip on a hexagonal matrix. This mimics the functionality of the various generations of XPOL readout chips, where a ROT (region of trigger) is automatically defines based on the 2 x 2 minicluster trigger logic, and then expanded with a configurable padding to form the ROI (region of interest) that is actually read out. """ padding: Padding = Padding(2, 2, 2, 2)
[docs] @staticmethod def sum_miniclusters(array: np.ndarray) -> np.ndarray: """Sum the values in a given numpy array over its 2 x 2 trigger miniclusters. Note that the shape of the target 2-dimensional array must be even in both dimensions for the thing to work. """ num_rows, num_cols = array.shape return array.reshape((num_rows // 2, 2, num_cols // 2, 2)).sum(-1).sum(1)
[docs] def sample(self, x: np.ndarray, y: np.ndarray) -> Tuple[Tuple[int, int], np.ndarray]: """Spatially sample a pair of arrays of x and y coordinates in physical space onto logical (hexagonal) coordinates in logical space. This is achieved by converting the (x, y) physical coordinates into the corresponding (col, row) logical coordinates on the hexagonal grid, and then filling a two-dimensional histogram in logical space. .. note:: The output two-dimensional histogram is restricted to the pixels with a physical signal, in order to avoid having to deal with large sparse arrays downstream. See https://github.com/lucabaldini/hexsample/issues/12 for more details about the reasoning behind this. Arguments --------- x : array_like The physical x coordinates to sample. y : array_like The physical y coordinates to sample. Returns ------- min_col, min_row, signal : 3-element tuple (2 integers and an array) The coordinates of the bottom-left corner of the smallest rectangle containing all the signal, and the corresponding histogram of the signal itself, in electron equivalent. """ # pylint: disable=invalid-name col, row = self.world_to_pixel(x, y) # Determine the corners of the relevant rectangle where the signal histogram # should be built. Reminder: in our trigger minicluster arrangement the minimum # column and row coordinates are always even and the maximum column and # row coordinates are always odd. min_col, max_col, min_row, max_row = col.min(), col.max(), row.min(), row.max() if self.is_odd(min_col): min_col -= 1 if self.is_even(max_col): max_col += 1 if self.is_odd(min_row): min_row -= 1 if self.is_even(max_row): max_row += 1 # Streamlined version of a two-dimensional histogram. As obscure as it # might seem, this four-liner is significantly faster than a call to # np.histogram2d and allows for a substantial speedup in the event generation. num_cols = max_col - min_col + 1 num_rows = max_row - min_row + 1 index = num_cols * (row - min_row) + (col - min_col) signal = np.bincount(index, minlength=num_cols * num_rows).reshape((num_rows, num_cols)) return min_col, min_row, signal
[docs] def trigger(self, signal: np.ndarray, min_col: int, min_row: int) -> Tuple[RegionOfInterest, np.ndarray]: """Apply the trigger, calculate the region of interest, and pad the signal array to the proper dimension. .. warning:: This is still incorrect at the edges of the readout chip, as we are not trimming the ROI (and the corresponding arrays) to the physical dimensions of the chip. """ # pylint: disable=too-many-arguments, too-many-locals # Sum the sampled signal into the 2 x 2 trigger miniclusters. trg_signal = self.sum_miniclusters(signal) # Zero-suppress the trigger signal below the trigger threshold. self.zero_suppress(trg_signal, self.trg_threshold) # This is tricky, and needs to be documented properly---basically we # build arrays with all the (minicluster) columns and rows for which # at least one minicluster is above threshold. The multiplicative factor # of 2 serves the purpose of converting minicluster to pixel coordinates. trg_cols = 2 * np.nonzero(trg_signal.sum(axis=0))[0] trg_rows = 2 * np.nonzero(trg_signal.sum(axis=1))[0] # Build the actual ROI in chip coordinates and initialize the RegionOfInterest # object. roi_min_col = min_col + trg_cols.min() - self.padding.left roi_max_col = min_col + trg_cols.max() + 1 + self.padding.right roi_min_row = min_row + trg_rows.min() - self.padding.top roi_max_row = min_row + trg_rows.max() + 1 + self.padding.bottom roi = RegionOfInterest(roi_min_col, roi_max_col, roi_min_row, roi_max_row, self.padding) # And now the actual PHA array: we start with all zeroes... pha = np.full(roi.shape(), 0.) # ...and then we patch the original signal array into the proper submask. num_rows, num_cols = signal.shape start_row = self.padding.bottom - trg_rows.min() start_col = self.padding.left - trg_cols.min() pha[start_row:start_row + num_rows, start_col:start_col + num_cols] = signal # And do not forget to increment the trigger identifier! self.trigger_id += 1 return roi, pha
[docs] def read(self, timestamp: float, x: np.ndarray, y: np.ndarray, offset: int = 0) -> DigiEventRectangular: """Overloaded method. """ # pylint: disable=invalid-name, too-many-arguments min_col, min_row, signal = self.sample(x, y) roi, pha = self.trigger(signal, min_col, min_row) pha = self.digitize(pha, offset) seconds, microseconds, livetime = self.latch_timestamp(timestamp) return DigiEventRectangular(self.trigger_id, seconds, microseconds, livetime, pha, roi)
# Definition of the readout proxy. ReadoutProxy = TypeProxy("readout_mode") # pylint: disable=invalid-name ReadoutProxy.register("circular", HexagonalReadoutCircular) ReadoutProxy.register("rectangular", HexagonalReadoutRectangular) # TODO: TO BE REMOVED!
[docs] class HexagonalReadoutMode(str, Enum): """Enum class expressing the possible readout strategies. """ RECTANGULAR = "rectangular" CIRCULAR = "circular"
# Mapping for the readout chip classes for each readout mode. _READOUT_CLASS_DICT = { HexagonalReadoutMode.RECTANGULAR: HexagonalReadoutRectangular, HexagonalReadoutMode.CIRCULAR: HexagonalReadoutCircular }
[docs] def _readout_class(mode: HexagonalReadoutMode) -> type: """Return the proper class to be used to instantiate a readout chip for a given readout mode. """ return _READOUT_CLASS_DICT[mode]
[docs] def readout_chip(mode: HexagonalReadoutMode, *args, **kwargs): """Return an instance of the proper readout chip for a given readout mode. """ return _readout_class(mode)(*args, **kwargs)