Source code for hexsample.readout

# Copyright (C) 2023 luca.baldini@pi.infn.it
#
# For the license terms see the file LICENSE, distributed along with this
# software.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""Readout facilities.
"""

from collections import Counter
from enum import Enum
from typing import Tuple

import numpy as np

from . import rng, xpol
from .digi import DigiEventCircular, DigiEventRectangular, DigiEventSparse
from .hexagon import HexagonalGrid, HexagonalLayout
from .roi import Padding, RegionOfInterest


[docs] class HexagonalReadoutMode(Enum): """Enum class expressing the possible readout strategies. """ # Sparse readout strategy. SPARSE = "SPARSE" # Rectangular readout, a la XPOL. RECTANGULAR = "RECTANGULAR" # Circular readout, with the highest pixel and the 6 neirest. CIRCULAR = "CIRCULAR"
[docs] class HexagonalReadoutBase(HexagonalGrid): """Description of a pixel readout chip on a hexagonal matrix. Arguments --------- layout : HexagonalLayout The layout of the hexagonal matrix. num_cols : int The number of columns in the readout. num_rows : int The number of rows in the readout. pitch : float The readout pitch in cm. enc : float The equivalent noise charge in electrons. gain : float The readout gain in ADC counts per electron. """ def __init__(self, layout: HexagonalLayout, num_cols: int, num_rows: int, pitch: float, enc: float, gain: float) -> None: """Constructor. """ # pylint: disable=too-many-arguments super().__init__(layout, num_cols, num_rows, pitch) self.enc = enc self.gain = gain self.shape = (self.num_rows, self.num_cols) self.trigger_id = -1
[docs] @staticmethod def discriminate(array: np.ndarray, threshold: float) -> np.ndarray: """Utility function acting as a simple constant-threshold discriminator over a generic array. This returns a boolean mask with True for all the array elements larger than the threshold. This is intented to avoid possible confusion between strict and loose comparison operators (e.g., < vs <=) when comparing the content of an array with a threshold, and all functions downstream doing this (e.g., zero_suppress) should use this and refrain from re-implementing their own logic. """ return array > threshold
[docs] @staticmethod def zero_suppress(array: np.ndarray, threshold: float) -> None: """Utility function to zero-suppress a generic array. This is returning an array of the same shape of the input where all the values lower or equal than the zero suppression threshold are set to zero. Arguments --------- array : array_like The input array. threshold : float The zero suppression threshold. """ mask = np.logical_not(HexagonalReadoutBase.discriminate(array, threshold)) array[mask] = 0
[docs] @staticmethod def latch_timestamp(timestamp: float) -> Tuple[int, int, int]: """Latch the event timestamp and return the corresponding fields of the digi event contribution: seconds, microseconds and livetime. .. warning:: The livetime calculation is not implemented, yet. Arguments --------- timestamp : float The ground-truth event timestamp from the event generator. """ microseconds, seconds = np.modf(timestamp) livetime = 0 return int(seconds), int(1000000 * microseconds), livetime
[docs] def digitize(self, pha: np.ndarray, zero_sup_threshold: int = 0, offset: int = 0) -> np.ndarray: """Digitize the actual signal. Arguments --------- pha : array_like The input array of pixel signals to be digitized. zero_sup_threshold : int Zero-suppression threshold in ADC counts. offset : int Optional offset in ADC counts to be applied before the zero suppression. """ # Note that the array type of the input pha argument is not guaranteed, here. # Over the course of the calculation the pha is bound to be a float (the noise # and the gain are floating-point numbere) before it is rounded to the neirest # integer. In order to take advantage of the automatic type casting that # numpy implements in multiplication and addition, we use the pha = pha +/* # over the pha +/*= form. # See https://stackoverflow.com/questions/38673531 # # Add the noise. if self.enc > 0: pha = pha + rng.generator.normal(0., self.enc, size=pha.shape) # ... apply the conversion between electrons and ADC counts... pha = pha * self.gain # ... round to the neirest integer... pha = np.round(pha).astype(int) # ... if necessary, add the offset for diagnostic events... pha += offset # ... zero suppress the thing... self.zero_suppress(pha, zero_sup_threshold) # ... flatten the array to simulate the serial readout and return the # array as the BEE would have. return pha.flatten()
[docs] class HexagonalReadoutSparse(HexagonalReadoutBase): """Description of a pixel sparse readout chip on a hexagonal matrix. In the following readout, no ROI is formed, every (and only) triggered pixel of the event is kept with its positional information in (col, row) format on the hexagonal grid. Arguments --------- layout : HexagonalLayout The layout of the hexagonal matrix. num_cols : int The number of columns in the readout. num_rows : int The number of rows in the readout. pitch : float The readout pitch in cm. enc : float The equivalent noise charge in electrons. gain : float The readout gain in ADC counts per electron. """
[docs] def read(self, timestamp: float, x: np.ndarray, y: np.ndarray, trg_threshold: float, zero_sup_threshold: int = 0, offset: int = 0) -> DigiEventSparse: """Sparse readout an event. Arguments --------- timestamp : float The event timestamp. x : array_like The physical x coordinates of the input charge. y : array_like The physical x coordinates of the input charge. trg_threshold : float Trigger threshold in electron equivalent. zero_sup_threshold : int Zero suppression threshold in ADC counts. offset : int Optional offset in ADC counts to be applied before the zero suppression. """ # Sample the input positions over the readout... signal = Counter((col, row) for col, row in zip(*self.world_to_pixel(x, y))) columns, rows, pha = np.array([[*key, value] for key, value in signal.items()]).T # ...apply the trigger... trigger_mask = self.discriminate(pha, trg_threshold) columns, rows, pha = columns[trigger_mask], rows[trigger_mask], pha[trigger_mask] # .. and digitize the pha values. pha = self.digitize(pha, zero_sup_threshold, offset) seconds, microseconds, livetime = self.latch_timestamp(timestamp) # And do not forget to increment the trigger identifier! self.trigger_id += 1 return DigiEventSparse(self.trigger_id, seconds, microseconds, livetime, pha, columns, rows)
[docs] class HexagonalReadoutRectangular(HexagonalReadoutBase): """Description of a pixel readout chip on a hexagonal matrix. """
[docs] @staticmethod def sum_miniclusters(array: np.ndarray) -> np.ndarray: """Sum the values in a given numpy array over its 2 x 2 trigger miniclusters. Note that the shape of the target 2-dimensional array must be even in both dimensions for the thing to work. """ num_rows, num_cols = array.shape return array.reshape((num_rows // 2, 2, num_cols // 2, 2)).sum(-1).sum(1)
[docs] @staticmethod def is_odd(value: int) -> bool: """Return whether the input integer is odd. See https://stackoverflow.com/questions/14651025/ for some metrics about the speed of this particular implementation. """ return value & 0x1
[docs] @staticmethod def is_even(value: int) -> bool: """Return whether the input integer is even. """ return not HexagonalReadoutRectangular.is_odd(value)
[docs] def sample(self, x: np.ndarray, y: np.ndarray) -> Tuple[Tuple[int, int], np.ndarray]: """Spatially sample a pair of arrays of x and y coordinates in physical space onto logical (hexagonal) coordinates in logical space. This is achieved by converting the (x, y) physical coordinates into the corresponding (col, row) logical coordinates on the hexagonal grid, and then filling a two-dimensional histogram in logical space. .. note:: The output two-dimensional histogram is restricted to the pixels with a physical signal, in order to avoid having to deal with large sparse arrays downstream. See https://github.com/lucabaldini/hexsample/issues/12 for more details about the reasoning behind this. Arguments --------- x : array_like The physical x coordinates to sample. y : array_like The physical y coordinates to sample. Returns ------- min_col, min_row, signal : 3-element tuple (2 integers and an array) The coordinates of the bottom-left corner of the smallest rectangle containing all the signal, and the corresponding histogram of the signal itself, in electron equivalent. """ # pylint: disable=invalid-name col, row = self.world_to_pixel(x, y) # Determine the corners of the relevant rectangle where the signal histogram # should be built. Reminder: in our trigger minicluster arrangement the minimum # column and row coordinates are always even and the maximum column and # row coordinates are always odd. min_col, max_col, min_row, max_row = col.min(), col.max(), row.min(), row.max() if self.is_odd(min_col): min_col -= 1 if self.is_even(max_col): max_col += 1 if self.is_odd(min_row): min_row -= 1 if self.is_even(max_row): max_row += 1 # Streamlined version of a two-dimensional histogram. As obscure as it # might seem, this four-liner is significantly faster than a call to # np.histogram2d and allows for a substantial speedup in the event generation. num_cols = max_col - min_col + 1 num_rows = max_row - min_row + 1 index = num_cols * (row - min_row) + (col - min_col) signal = np.bincount(index, minlength=num_cols * num_rows).reshape((num_rows, num_cols)) return min_col, min_row, signal
[docs] def trigger(self, signal: np.ndarray, trg_threshold, min_col: int, min_row: int, padding: Padding) -> Tuple[RegionOfInterest, np.ndarray]: """Apply the trigger, calculate the region of interest, and pad the signal array to the proper dimension. .. warning:: This is still incorrect at the edges of the readout chip, as we are not trimming the ROI (and the corresponding arrays) to the physical dimensions of the chip. """ # pylint: disable=too-many-arguments, too-many-locals # Sum the sampled signal into the 2 x 2 trigger miniclusters. trg_signal = self.sum_miniclusters(signal) # Zero-suppress the trigger signal below the trigger threshold. self.zero_suppress(trg_signal, trg_threshold) # This is tricky, and needs to be documented properly---basically we # build arrays with all the (minicluster) columns and rows for which # at least one minicluster is above threshold. The multiplicative factor # of 2 serves the purpose of converting minicluster to pixel coordinates. trg_cols = 2 * np.nonzero(trg_signal.sum(axis=0))[0] trg_rows = 2 * np.nonzero(trg_signal.sum(axis=1))[0] # Build the actual ROI in chip coordinates and initialize the RegionOfInterest # object. roi_min_col = min_col + trg_cols.min() - padding.left roi_max_col = min_col + trg_cols.max() + 1 + padding.right roi_min_row = min_row + trg_rows.min() - padding.top roi_max_row = min_row + trg_rows.max() + 1 + padding.bottom roi = RegionOfInterest(roi_min_col, roi_max_col, roi_min_row, roi_max_row, padding) # And now the actual PHA array: we start with all zeroes... pha = np.full(roi.shape(), 0.) # ...and then we patch the original signal array into the proper submask. num_rows, num_cols = signal.shape start_row = padding.bottom - trg_rows.min() start_col = padding.left - trg_cols.min() pha[start_row:start_row + num_rows, start_col:start_col + num_cols] = signal # And do not forget to increment the trigger identifier! self.trigger_id += 1 return roi, pha
[docs] def read(self, timestamp: float, x: np.ndarray, y: np.ndarray, trg_threshold: float, padding: Padding, zero_sup_threshold: int = 0, offset: int = 0) -> DigiEventRectangular: """Readout an event. Arguments --------- timestamp : float The event timestamp. x : array_like The physical x coordinates of the input charge. y : array_like The physical x coordinates of the input charge. trg_threshold : float Trigger threshold in electron equivalent. padding : Padding The padding to be applied to the ROT. zero_sup_threshold : int Zero suppression threshold in ADC counts. offset : int Optional offset in ADC counts to be applied before the zero suppression. """ # pylint: disable=invalid-name, too-many-arguments min_col, min_row, signal = self.sample(x, y) roi, pha = self.trigger(signal, trg_threshold, min_col, min_row, padding) pha = self.digitize(pha, zero_sup_threshold, offset) seconds, microseconds, livetime = self.latch_timestamp(timestamp) return DigiEventRectangular(self.trigger_id, seconds, microseconds, livetime, pha, roi)
[docs] class Xpol3(HexagonalReadoutRectangular): """Derived class representing the XPOL-III readout chip. """ def __init__(self, enc: float = 20., gain: float = 1.) -> None: """Constructor. """ super().__init__(xpol.XPOL1_LAYOUT, *xpol.XPOL3_SIZE, xpol.XPOL_PITCH, enc, gain)
[docs] class HexagonalReadoutCircular(HexagonalReadoutBase): """Description of a pixel circular readout chip on a hexagonal matrix. In the following readout, the maximum PHA pixel is found and the ROI formed by that pixel and its 6 adjacent neighbours. The standard shape of columns, rows and pha array is then 7, except for events on border, that will have len<7. Arguments --------- layout : HexagonalLayout The layout of the hexagonal matrix. num_cols : int The number of columns in the readout. num_rows : int The number of rows in the readout. pitch : float The readout pitch in cm. enc : float The equivalent noise charge in electrons. gain : float The readout gain in ADC counts per electron. """ NUM_PIXELS = 7
[docs] def read(self, timestamp: float, x: np.ndarray, y: np.ndarray, trg_threshold: float, zero_sup_threshold: int = 0, offset: int = 0) -> DigiEventCircular: """Circular readout an event. Arguments --------- timestamp : float The event timestamp. x : float The physical x coordinate of the highest pha pixel. y : float The physical y coordinate of the highest pha pixel. trg_threshold : float Trigger threshold in electron equivalent. zero_sup_threshold : int Zero suppression threshold in ADC counts. offset : int Optional offset in ADC counts to be applied before the zero suppression. """ # pylint: disable=unused-argument # Sample the input positions over the readout... sparse_signal = Counter((col, row) for col, row in zip(*self.world_to_pixel(x, y))) # ...sampling the input position of the highest PHA pixel over the readout... # See: https://stackoverflow.com/questions/70094914/max-on-collections-counter coord_max = max(sparse_signal, key=sparse_signal.get) # col_max, row_max = coord_max #... and converting it in ADC channel coordinates (value from 0 to 6)... adc_max = self.adc_channel(*coord_max) # ... creating a 7-elements array containing the PHA of the ADC channels from 0 to 6 # in increasing order and filling it with PHAs of the highest px and its neigbors... pha = np.empty(self.NUM_PIXELS) pha[adc_max] = sparse_signal[coord_max] # ... identifying the 6 neighbors of the central pixel and saving the signal pixels # prepending the coordinates of the highest one... for coords in self.neighbors(*coord_max): pha[self.adc_channel(*coords)] = sparse_signal[coords] # ...apply the trigger... # Not sure the trigger is needed, the highest px passed # necessarily the trigger or there is no event # trigger_mask = self.discriminate(pha, trg_threshold) # .. and digitize the pha values. pha = self.digitize(pha, zero_sup_threshold, offset) seconds, microseconds, livetime = self.latch_timestamp(timestamp) # And do not forget to increment the trigger identifier! self.trigger_id += 1 # The pha array is always in the order # [pha(adc0), pha(adc1), pha(adc2), pha(adc3), pha(adc4), pha(adc5), pha(adc6)] return DigiEventCircular(self.trigger_id, seconds, microseconds, livetime, pha, *coord_max)
# Mapping for the readout chip classes for each readout mode. _READOUT_CLASS_DICT = { HexagonalReadoutMode.SPARSE: HexagonalReadoutSparse, HexagonalReadoutMode.RECTANGULAR: HexagonalReadoutRectangular, HexagonalReadoutMode.CIRCULAR: HexagonalReadoutCircular }
[docs] def _readout_class(mode: HexagonalReadoutMode) -> type: """Return the proper class to be used to instantiate a readout chip for a given readout mode. """ return _READOUT_CLASS_DICT[mode]
[docs] def readout_chip(mode: HexagonalReadoutMode, *args, **kwargs): """Return an instance of the proper readout chip for a given readout mode. """ return _readout_class(mode)(*args, **kwargs)