import shutil
import tempfile
import time
from os import PathLike
from pathlib import Path
from typing import Union
import SimpleITK as sitk
import dicom2nifti
import math
import nibabel as nib
import numpy as np
import pydicom
from pydicom import dcmread
from Hive.utils.file_utils import subfolders
from Hive.utils.log_utils import get_logger
logger = get_logger(__name__)
[docs]def dcm2nii_CT(CT_dcm_path: Union[str, PathLike], nii_out_path: Union[str, PathLike]):
"""
Conversion of CT DICOM to nifti and save in nii_out_path.
Parameters
----------
CT_dcm_path :
CT DICOM folder path.
nii_out_path :
Output NIFTI file path.
"""
with tempfile.TemporaryDirectory() as tmp:
tmp = Path(str(tmp))
dicom2nifti.convert_directory(CT_dcm_path, str(tmp), compression=True, reorient=True)
nii = next(tmp.glob("*nii.gz"))
shutil.copy(nii, nii_out_path)
[docs]def dcm2nii_mask(mask_dcm_path: Union[str, PathLike], nii_out_path: Union[str, PathLike], ref_nii_path: Union[str, PathLike]):
"""
Converts a SEG DICOM volume into NIFTI format. Requires an existing NIFTI file to derive the corresponding affine transform.
Parameters
----------
mask_dcm_path :
SEG DICOM folder.
nii_out_path :
NIFTI file saved as output.
ref_nii_path :
Reference NIFTI used to correctly saved the segmentation volume.
"""
mask_dcm = list(mask_dcm_path.glob("*.dcm"))[0]
mask = pydicom.read_file(str(mask_dcm))
mask_array = mask.pixel_array
mask_array = np.transpose(mask_array, (2, 1, 0))
mask_orientation = mask[0x5200, 0x9229][0].PlaneOrientationSequence[0].ImageOrientationPatient
if mask_orientation[4] == -1:
mask_array = np.flip(mask_array, 1)
# get affine matrix from the corresponding pet
pet = nib.load(ref_nii_path)
pet_affine = pet.affine
# return mask as nifti object
mask_out = nib.Nifti1Image(mask_array, pet_affine)
nib.save(mask_out, nii_out_path)
[docs]def convert_DICOM_folder_to_NIFTI_image(patient_dicom_folder: Union[str, PathLike], patient_nifti_folder: Union[str, PathLike]):
"""
Converts a given Patient DICOM folder into NIFTI format, saving the DICOM Studies in different folders.
Parameters
----------
patient_dicom_folder :
DICOM folder containing a single patient Studies.
patient_nifti_folder :
Output NIFTI folder used as stem to save the DICOM Studies. The Study index is appended to this path to create
the corresponding NIFTI study folder path.
"""
studies = subfolders(patient_dicom_folder, join=False)
single_study = False
if len(studies) == 1:
single_study = True
patient_study_map = {Path(patient_dicom_folder).name: {}}
for study_id, study in enumerate(studies):
patient_study_map[Path(patient_dicom_folder).name][study_id] = study
if not single_study:
Path(str(patient_nifti_folder) + "_{}".format(study_id)).mkdir(parents=True, exist_ok=True)
else:
Path(str(patient_nifti_folder)).mkdir(parents=True, exist_ok=True)
series = subfolders(Path(patient_dicom_folder).joinpath(study), join=False)
for serie in series:
first_file = next(Path(patient_dicom_folder).joinpath(study, serie).glob("*.dcm"))
ds = pydicom.dcmread(str(first_file))
if ds.Modality == "CT":
if not single_study:
ct_filename = str(
Path(str(patient_nifti_folder) + "_{}".format(study_id)).joinpath(
"{}_{}_CT.nii.gz".format(Path(patient_dicom_folder).name, study_id)
)
)
else:
ct_filename = str(
Path(str(patient_nifti_folder)).joinpath("{}_CT.nii.gz".format(Path(patient_dicom_folder).name))
)
dcm2nii_CT(str(Path(patient_dicom_folder).joinpath(study, serie)), ct_filename)
elif ds.Modality == "PT":
if not single_study:
pet_filename = str(
Path(str(patient_nifti_folder) + "_{}".format(study_id)).joinpath(
"{}_{}_PET.nii.gz".format(Path(patient_dicom_folder).name, study_id)
)
)
else:
pet_filename = str(
Path(str(patient_nifti_folder)).joinpath("{}_PET.nii.gz".format(Path(patient_dicom_folder).name))
)
normalize_PET_to_SUV_BW(str(Path(patient_dicom_folder).joinpath(study, serie)), pet_filename)
for study_id, study in enumerate(studies):
series = subfolders(Path(patient_dicom_folder).joinpath(study), join=False)
for serie in series:
first_file = next(Path(patient_dicom_folder).joinpath(study, serie).glob("*.dcm"))
ds = pydicom.dcmread(str(first_file))
if ds.Modality == "SEG":
if not single_study:
seg_filename = str(
Path(str(patient_nifti_folder) + "_{}".format(study_id)).joinpath(
"{}_{}_SEG.nii.gz".format(Path(patient_dicom_folder).name, study_id)
)
)
ref_filename = str(
Path(str(patient_nifti_folder) + "_{}".format(study_id)).joinpath(
"{}_{}_PET.nii.gz".format(Path(patient_dicom_folder).name, study_id)
)
)
else:
seg_filename = str(
Path(str(patient_nifti_folder)).joinpath("{}_SEG.nii.gz".format(Path(patient_dicom_folder).name))
)
ref_filename = str(
Path(str(patient_nifti_folder)).joinpath("{}_PET.nii.gz".format(Path(patient_dicom_folder).name))
)
dcm2nii_mask(Path(patient_dicom_folder).joinpath(study, serie), seg_filename, ref_filename)
return patient_study_map
[docs]def normalize_PET_to_SUV_BW(dicom_pet_series_folder: Union[str, PathLike], suv_pet_filename: Union[str, PathLike]):
"""
SUV BW Normalization of DICOM PET volume. The resulting normalized PET volume is saved at **suv_pet_filename**.
Parameters
----------
dicom_pet_series_folder :
DICOM PET Folder to be normalized.
suv_pet_filename:
Normalized SUV PET file location.
"""
reader = sitk.ImageSeriesReader()
dicom_names = reader.GetGDCMSeriesFileNames(dicom_pet_series_folder)
ds = dcmread(dicom_names[0])
corrected_image = ds[0x0028, 0x0051].value
decay_correction = ds[0x0054, 0x1102].value
units = ds[0x0054, 0x1001].value
series_date = ds.SeriesDate
acquisition_date = ds.AcquisitionDate
series_time = ds.SeriesTime
acquisition_time = ds.AcquisitionTime
half_life = ds.RadiopharmaceuticalInformationSequence[0].RadionuclideHalfLife
weight = ds.PatientWeight
if "ATTN" in corrected_image and "DECY" in corrected_image and decay_correction == "START":
if units == "BQML":
if series_time <= acquisition_time and series_date <= acquisition_date:
scan_date = series_date
scan_time = series_time
else:
scan_date = acquisition_date
scan_time = acquisition_time
# if not RadiopharmaceuticalStartTime in ds.RadiopharmaceuticalInformationSequence[0]:
# ...
# else:
start_time = ds.RadiopharmaceuticalInformationSequence[0].RadiopharmaceuticalStartTime
start_date = scan_date
scan_time = str(round(float(scan_time)))
str_scan_time = time.strptime(scan_date + scan_time, "%Y%m%d%H%M%S")
start_time = str(round(float(start_time)))
str_start_time = time.strptime(start_date + start_time, "%Y%m%d%H%M%S")
decay_time = time.mktime(str_scan_time) - time.mktime(str_start_time)
injected_dose = ds.RadiopharmaceuticalInformationSequence[0].RadionuclideTotalDose
decayed_dose = injected_dose * math.pow(2, -decay_time / half_life)
SUB_BW_scale_factor = (weight * 1000) / decayed_dose
rescale_slope = 1 # ds[0x0028,0x1053].value
total_factor = rescale_slope * SUB_BW_scale_factor
reader.SetFileNames(dicom_names)
image = reader.Execute()
image_array = sitk.GetArrayFromImage(image)
image_array *= total_factor
itk_image = sitk.GetImageFromArray(image_array)
itk_image.CopyInformation(image)
sitk.WriteImage(itk_image, suv_pet_filename)