Skip to content

Code reference

Bases: Device

Custom implementation of zmq processing server for X-ray detector MOENCH made in PSI which is integrated with a Tango device server.

Source code in pytango-moenchZmqServer\MoenchZmqServer.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
class MoenchZmqServer(Device):
    """Custom implementation of zmq processing server for X-ray detector MOENCH made in PSI which is integrated with a Tango device server."""

    processing_function = None
    processing_function_enum = ProcessingMode(0)

    _manager = None
    _context = None
    _socket = None
    _process_pool = None
    green_mode = GreenMode.Asyncio

    # probably should be rearranged in array, because there will pumped and unpumped images, for each type of processing
    # and further loaded with dynamic attributes
    shared_memory_pedestal = None
    shared_memory_analog_img = None
    shared_memory_threshold_img = None
    shared_memory_counting_img = None

    shared_threshold = None
    shared_counting_threshold = None
    shared_processed_frames = None
    shared_amount_frames = None
    shared_server_running = False
    shared_split_pump = None

    reorder_table = None

    _save_analog_img = True
    _save_threshold_img = True
    _save_counting_img = True

    ZMQ_RX_IP = device_property(
        dtype=str,
        doc="port of the slsReceiver instance, must match the config",
        default_value="192.168.2.200",
    )
    ZMQ_RX_PORT = device_property(
        dtype=str,
        doc="ip of slsReceiver instance, must match the config",
        default_value="50003",
    )
    PROCESSING_CORES = device_property(
        dtype=int,
        doc="cores amount to process, up to 72 on MOENCH workstation",
        default_value=20,
    )
    FLIP_IMAGE = device_property(
        dtype=bool,
        doc="should the final image be flipped/inverted along y-axis",
        default_value=True,
    )

    pedestal = attribute(
        display_level=DispLevel.EXPERT,
        label="pedestal",
        dtype=float,
        dformat=AttrDataFormat.IMAGE,
        max_dim_x=400,
        max_dim_y=400,
        access=AttrWriteType.READ_WRITE,
        doc="pedestal (averaged dark images), i.e. offset which will be subtracted from each acquired picture",
    )
    analog_img = attribute(
        display_level=DispLevel.EXPERT,
        label="analog img",
        dtype=float,
        dformat=AttrDataFormat.IMAGE,
        max_dim_x=400,
        max_dim_y=400,
        access=AttrWriteType.READ,
        doc="sum of images processed with subtracted pedestals",
    )
    threshold_img = attribute(
        display_level=DispLevel.EXPERT,
        label="threshold img",
        dtype=float,
        dformat=AttrDataFormat.IMAGE,
        max_dim_x=400,
        max_dim_y=400,
        access=AttrWriteType.READ,
        doc='sum of "analog images" (with subtracted pedestal) processed with thresholding algorithm',
    )
    counting_img = attribute(
        display_level=DispLevel.EXPERT,
        label="counting img",
        dtype=float,
        dformat=AttrDataFormat.IMAGE,
        max_dim_x=400,
        max_dim_y=400,
        access=AttrWriteType.READ,
        doc='sum of "analog images" (with subtracted pedestal) processed with counting algorithm',
    )

    threshold = attribute(
        label="th",
        unit="ADU",
        dtype=float,
        min_value=0.0,
        access=AttrWriteType.READ_WRITE,
        memorized=True,
        hw_memorized=True,
        doc="cut-off value for thresholding",
    )
    counting_threshold = attribute(
        label="counting th",
        unit="ADU",
        dtype=float,
        min_value=0.0,
        access=AttrWriteType.READ_WRITE,
        memorized=True,
        hw_memorized=True,
        doc="cut-off value for counting",
    )
    processing_mode = attribute(
        label="mode",
        dtype=ProcessingMode,
        access=AttrWriteType.READ_WRITE,
        memorized=True,
        hw_memorized=True,
        fisallowed="isWriteAvailable",
        doc="mode of frames processing [ANALOG = 0, THRESHOLD = 1, COUNTING = 2, PEDESTAL = 3]",
    )

    processed_frames = attribute(
        label="proc frames",
        dtype=int,
        access=AttrWriteType.READ,
        doc="amount of already processed frames",
    )
    amount_frames = attribute(
        label="expected frames",
        dtype=int,
        access=AttrWriteType.READ_WRITE,
        doc="expected frames to receive from detector",
    )

    server_running = attribute(
        display_level=DispLevel.EXPERT,
        label="is server running?",
        dtype=bool,
        access=AttrWriteType.READ,
        doc="if true - server is running, otherwise - not",
    )

    # split_pump = attribute(
    #     label="split (un)pumped",
    #     dtype=bool,
    #     access=AttrWriteType.READ_WRITE,
    #     memorized=True,
    #     hw_memorized=True,
    #     doc="split odd and even frames",
    # )

    save_analog_img = attribute(
        label="save analog",
        dtype=bool,
        access=AttrWriteType.READ_WRITE,
        memorized=True,
        hw_memorized=True,
        doc="save analog .tiff file after acquisition",
    )

    save_threshold_img = attribute(
        label="save threshold",
        dtype=bool,
        access=AttrWriteType.READ_WRITE,
        memorized=True,
        hw_memorized=True,
        doc="save threshold .tiff file after acquisition",
    )

    save_counting_img = attribute(
        label="save counting",
        dtype=bool,
        access=AttrWriteType.READ_WRITE,
        memorized=True,
        hw_memorized=True,
        doc="save counting .tiff file after acquisition",
    )

    def write_pedestal(self, value):
        self.shared_pedestal.value = value

    def read_pedestal(self):
        return self._read_shared_array(
            shared_memory=self.shared_memory_pedestal, flip=self.FLIP_IMAGE
        )

    def write_analog_img(self, value):
        self._write_shared_array(
            shared_memory=self.shared_memory_analog_img, value=value
        )

    def read_analog_img(self):
        return self._read_shared_array(
            shared_memory=self.shared_memory_analog_img, flip=self.FLIP_IMAGE
        )

    def write_threshold_img(self, value):
        self._write_shared_array(
            shared_memory=self.shared_memory_threshold_img, value=value
        )

    def read_threshold_img(self):
        return self._read_shared_array(
            shared_memory=self.shared_memory_threshold_img, flip=self.FLIP_IMAGE
        )

    def write_counting_img(self, value):
        self._write_shared_array(
            shared_memory=self.shared_memory_counting_img, value=value
        )

    def read_counting_img(self):
        return self._read_shared_array(
            shared_memory=self.shared_memory_counting_img, flip=self.FLIP_IMAGE
        )

    def write_threshold(self, value):
        self.shared_threshold.value = value

    def read_threshold(self):
        return self.shared_threshold.value

    def write_counting_threshold(self, value):
        self.shared_counting_threshold.value = value

    def read_counting_threshold(self):
        return self.shared_counting_threshold.value

    def write_processing_mode(self, value):
        # matching values and functions [ANALOG = 0, THRESHOLD = 1, COUNTING = 2]
        self.processing_function_enum = ProcessingMode(value)
        match self.processing_function_enum:
            case ProcessingMode.ANALOG:
                self.processing_function = processing_functions.analog
            case ProcessingMode.THRESHOLD:
                self.processing_function = processing_functions.thresholding
            case ProcessingMode.COUNTING:
                self.processing_function = processing_functions.counting

    def read_processing_mode(self):
        return self.processing_function_enum

    def write_processed_frames(self, value):
        self.shared_processed_frames.value = value

    def read_processed_frames(self):
        return self.shared_processed_frames.value

    def write_amount_frames(self, value):
        self.shared_amount_frames.value = value

    def read_amount_frames(self):
        return self.shared_amount_frames.value

    def write_server_running(self, value):
        self.shared_server_running.value = int(value)

    def read_server_running(self):
        return bool(self.shared_server_running.value)

    def write_save_analog_img(self, value):
        self._save_analog_img = value

    def read_save_analog_img(self):
        return self._save_analog_img

    def write_save_threshold_img(self, value):
        self._save_threshold_img = value

    def read_save_threshold_img(self):
        return self._save_threshold_img

    def write_save_counting_img(self, value):
        self._save_counting_img = value

    def read_save_counting_img(self):
        return self._save_counting_img

    # when processing is ready -> self.push_change_event(self, "analog_img"/"counting_img"/"threshold_img")

    async def main(self):
        while True:
            header, payload = await self.get_msg_pair()
            if payload is not None:
                # def wrap_function(
                # mode,
                # header,
                # payload,
                # lock,
                # shared_memories,
                # processed_frames,
                # amount_frames,
                # frame_func,
                # threshold,
                # counting_threshold,
                # *args,
                # **kwargs)
                self.shared_received_frames.value += 1
                future = self._process_pool.submit(
                    wrap_function,
                    self.processing_function_enum,
                    header,
                    payload.astype(float),
                    self._lock,
                    [
                        self.shared_memory_analog_img,
                        self.shared_memory_threshold_img,
                        self.shared_memory_counting_img,
                        self.shared_memory_pedestal,
                    ],  # need to changed corresponding to the frame_func
                    self.shared_processed_frames,
                    self.shared_received_frames,
                    self.processing_function,
                    self.shared_threshold,
                    self.shared_counting_threshold,
                )
                future = asyncio.wrap_future(future)

    async def get_msg_pair(self):
        isNextPacketData = True
        header = None
        payload = None
        packet1 = await self._socket.recv()
        try:
            print("parsing header...")
            header = json.loads(packet1)
            print(header)
            isNextPacketData = header.get("data") == 1
            print(f"isNextPacketdata {isNextPacketData}")
        except:
            print("is not header")
            isNextPacketData = False
        if isNextPacketData:
            print("parsing data...")
            packet2 = await self._socket.recv()
            payload = np.zeros([400, 400], dtype=np.uint16)
            raw_buffer = np.frombuffer(packet2, dtype=np.uint16)
            # see in docs
            # should be moved to each process to prevent performance bottleneck
            payload = raw_buffer[self.reorder_table]
        return header, payload

    def _read_shared_array(self, shared_memory, flip: bool):
        self._lock.acquire()
        buf = np.ndarray((400, 400), dtype=float, buffer=shared_memory.buf)
        array = np.copy(buf)
        self._lock.release()
        if flip:
            return np.flipud(array)
        else:
            return array

    def _write_shared_array(self, shared_memory, value):
        self._lock.acquire()
        array = np.ndarray((400, 400), dtype=float, buffer=shared_memory.buf)
        array = value
        self._lock.release()

    @command
    def start_receiver(self):
        empty = np.zeros([400, 400], dtype=float)
        self.write_server_running(True)
        self.write_processed_frames(0)
        self.write_analog_img(empty)
        self.write_counting_img(empty)
        self.write_threshold_img(empty)
        self.set_state(DevState.RUNNING)

    @command
    async def stop_receiver(self):
        received_frames = self.shared_received_frames.value
        print(f"received {received_frames} frames")
        loop = asyncio.get_event_loop()
        loop.create_task(self.async_stop_receiver(received_frames))

    async def async_stop_receiver(self, received_frames):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.block_stop_receiver, received_frames)

    def block_stop_receiver(self, received_frames_at_the_time):
        while received_frames_at_the_time != self.shared_processed_frames.value:
            print("not all frames processed yet")
            time.sleep(2)
        self.write_server_running(False)
        self.set_state(DevState.ON)
        print("all frames processed")
        # HERE ALL POST HOOKS
        # if processmode == pedestal:
        # shared_pedestal / received frames

    @command
    def acquire_pedestals(self):
        pass

    def init_device(self):
        """Initial tangoDS setup"""
        Device.init_device(self)
        self.set_state(DevState.INIT)
        self.get_device_properties(self.get_device_class())

        self.reorder_table = np.load("reorder_table.npy")
        # sync manager for synchronization between threads
        self._manager = mp.Manager()
        # using simple mutex (lock) to synchronize
        self._lock = self._manager.Lock()

        # manager for allocation of shared memory between threads
        self._shared_memory_manager = SharedMemoryManager()
        # starting the shared memory manager
        self._shared_memory_manager.start()
        # default values of properties do not work without database though ¯\_(ツ)_/¯
        processing_cores_amount = 16  # self.PROCESSING_CORES
        zmq_ip = "127.0.0.1"  # self.ZMQ_RX_IP
        zmq_port = "50003"  # self.ZMQ_RX_PORT

        # using shared threadsafe Value instance from multiprocessing
        self.shared_threshold = self._manager.Value("f", 0)
        self.shared_counting_threshold = self._manager.Value("f", 0)
        self.shared_server_running = self._manager.Value("b", 0)
        self.shared_processed_frames = self._manager.Value("I", 0)
        self.shared_received_frames = self._manager.Value("I", 0)
        self.shared_amount_frames = self._manager.Value("I", 0)
        self.shared_split_pump = self._manager.Value("b", 0)

        # calculating how many bytes need to be allocated and shared for a 400x400 float numpy array
        img_bytes = np.zeros([400, 400], dtype=float).nbytes
        # allocating 4 arrays of this type
        self.shared_memory_pedestal = self._shared_memory_manager.SharedMemory(
            size=img_bytes
        )
        self.shared_memory_analog_img = self._shared_memory_manager.SharedMemory(
            size=img_bytes
        )
        self.shared_memory_threshold_img = self._shared_memory_manager.SharedMemory(
            size=img_bytes
        )
        self.shared_memory_counting_img = self._shared_memory_manager.SharedMemory(
            size=img_bytes
        )
        # creating thread pool executor to which the frame processing will be assigned
        self._process_pool = ProcessPoolExecutor(processing_cores_amount)

        # creating and initialing socket to read from
        self._init_zmq_socket(zmq_ip, zmq_port)
        loop = asyncio.get_event_loop()
        loop.create_task(self.main())

        # initialization of tango events for pictures buffers
        self.set_change_event("analog_img", True, False)
        self.set_change_event("threshold_img", True, False)
        self.set_change_event("counting_img", True, False)
        self.set_state(DevState.ON)

    # updating of tango events for pictures buffers
    @command
    def update_images_events(self):
        self.push_change_event("analog_img", self.read_analog_img(), 400, 400),
        self.push_change_event("threshold_img", self.read_threshold_img(), 400, 400)
        self.push_change_event("counting_img", self.read_counting_img(), 400, 400)

    # save files on disk for pictures buffers
    def save_files(self, path, filename, index):
        """Function for saving the buffered images in .tiff format.
        The files will have different postfixes depending on processing mode.

        Args:
            path (str): folder to save
            filename (str): name to save
            index (str): capture index
        """
        savepath = os.path.join(path, filename)
        if self.read_save_analog_img():
            im = Image.fromarray(self.read_analog_img())
            im.save(f"{savepath}_{index}_analog.tiff")

        if self.read_save_threshold_img():
            im = Image.fromarray(self.read_threshold_img())
            im.save(f"{savepath}_{index}_threshold_{self.read_threshold()}.tiff")

        if self.read_save_counting_img():
            im = Image.fromarray(self.read_analog_img())
            im.save(
                f"{savepath}_{index}_counting_{self.read_counting_threshold()}.tiff"
            )

    def _init_zmq_socket(self, zmq_ip: str, zmq_port: str):
        endpoint = f"tcp://{zmq_ip}:{zmq_port}"
        self._context = zmq.asyncio.Context()
        self._socket = self._context.socket(zmq.SUB)
        print(f"Connecting to: {endpoint}")
        self._socket.connect(endpoint)
        self._socket.setsockopt(zmq.SUBSCRIBE, b"")

    def delete_device(self):
        self._process_pool.shutdown()
        self._manager.shutdown()
        self._shared_memory_manager.shutdown()

init_device()

Initial tangoDS setup

Source code in pytango-moenchZmqServer\MoenchZmqServer.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def init_device(self):
    """Initial tangoDS setup"""
    Device.init_device(self)
    self.set_state(DevState.INIT)
    self.get_device_properties(self.get_device_class())

    self.reorder_table = np.load("reorder_table.npy")
    # sync manager for synchronization between threads
    self._manager = mp.Manager()
    # using simple mutex (lock) to synchronize
    self._lock = self._manager.Lock()

    # manager for allocation of shared memory between threads
    self._shared_memory_manager = SharedMemoryManager()
    # starting the shared memory manager
    self._shared_memory_manager.start()
    # default values of properties do not work without database though ¯\_(ツ)_/¯
    processing_cores_amount = 16  # self.PROCESSING_CORES
    zmq_ip = "127.0.0.1"  # self.ZMQ_RX_IP
    zmq_port = "50003"  # self.ZMQ_RX_PORT

    # using shared threadsafe Value instance from multiprocessing
    self.shared_threshold = self._manager.Value("f", 0)
    self.shared_counting_threshold = self._manager.Value("f", 0)
    self.shared_server_running = self._manager.Value("b", 0)
    self.shared_processed_frames = self._manager.Value("I", 0)
    self.shared_received_frames = self._manager.Value("I", 0)
    self.shared_amount_frames = self._manager.Value("I", 0)
    self.shared_split_pump = self._manager.Value("b", 0)

    # calculating how many bytes need to be allocated and shared for a 400x400 float numpy array
    img_bytes = np.zeros([400, 400], dtype=float).nbytes
    # allocating 4 arrays of this type
    self.shared_memory_pedestal = self._shared_memory_manager.SharedMemory(
        size=img_bytes
    )
    self.shared_memory_analog_img = self._shared_memory_manager.SharedMemory(
        size=img_bytes
    )
    self.shared_memory_threshold_img = self._shared_memory_manager.SharedMemory(
        size=img_bytes
    )
    self.shared_memory_counting_img = self._shared_memory_manager.SharedMemory(
        size=img_bytes
    )
    # creating thread pool executor to which the frame processing will be assigned
    self._process_pool = ProcessPoolExecutor(processing_cores_amount)

    # creating and initialing socket to read from
    self._init_zmq_socket(zmq_ip, zmq_port)
    loop = asyncio.get_event_loop()
    loop.create_task(self.main())

    # initialization of tango events for pictures buffers
    self.set_change_event("analog_img", True, False)
    self.set_change_event("threshold_img", True, False)
    self.set_change_event("counting_img", True, False)
    self.set_state(DevState.ON)

save_files(path, filename, index)

Function for saving the buffered images in .tiff format. The files will have different postfixes depending on processing mode.

Parameters:

Name Type Description Default
path str

folder to save

required
filename str

name to save

required
index str

capture index

required
Source code in pytango-moenchZmqServer\MoenchZmqServer.py
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
def save_files(self, path, filename, index):
    """Function for saving the buffered images in .tiff format.
    The files will have different postfixes depending on processing mode.

    Args:
        path (str): folder to save
        filename (str): name to save
        index (str): capture index
    """
    savepath = os.path.join(path, filename)
    if self.read_save_analog_img():
        im = Image.fromarray(self.read_analog_img())
        im.save(f"{savepath}_{index}_analog.tiff")

    if self.read_save_threshold_img():
        im = Image.fromarray(self.read_threshold_img())
        im.save(f"{savepath}_{index}_threshold_{self.read_threshold()}.tiff")

    if self.read_save_counting_img():
        im = Image.fromarray(self.read_analog_img())
        im.save(
            f"{savepath}_{index}_counting_{self.read_counting_threshold()}.tiff"
        )