UKB数据提取

UKB数据提取

1
2
3
4
5
6
7
8
9
10
11
root@415d37a2c97b:/opt/notebooks# cd /mnt
root@415d37a2c97b:/mnt# cd project
root@415d37a2c97b:/mnt/project# cd *TA
root@415d37a2c97b:/opt/notebooks# cd /mnt
root@415d37a2c97b:/mnt# cd project
root@415d37a2c97b:/mnt/project# cd *TA
root@415d37a2c97b:/opt/notebooks# cd /mnt
root@415d37a2c97b:/mnt# cd project
root@415d37a2c97b:/mnt/project# cd *TA
root@415d37a2c97b:/mnt/project/GMV_DATA# cp *.csv /opt/notebooks
root@415d37a2c97b:/mnt/project/GMV_DATA#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# 导入所需的库
import pandas as pd # 用于读取表格数据
import os # 用于操作系统功能,如文件路径操作、遍历目录
import shutil # 用于文件操作,如复制文件
import sys # 用于系统相关功能,如退出脚本、标准错误输出
import math # 用于数学运算(如此处不需要,但有时可能有用)

# --- 配置设定 ---

# 1. 包含参与者ID和目标文件名的表格文件的路径
# (请更新为您的表格文件的正确路径和名称)
participant_table_path = 'Participant_table.csv'

# 2. 表格中包含目标 T1 .zip 文件名的列的名称
# (请从您的表格文件 image_4b5fe0.jpg 中获取确切的列标题)
# *** 请务必用您表格中确切的列名称更新此变量! ***
filename_column = 'T1 structural brain images - NIFTI | Instance 2'

# 3. 存放 T1 .zip 文件的基础源目录 (文件在其中的子文件夹内)
source_base_dir = '/mnt/project/Bulk/Brain MRI/T1/'

# 4. 您希望将找到的 .zip 文件复制到的 **基础** 目标目录
# 脚本会在此目录下创建 batch_001, batch_002 ... 等子文件夹
destination_base_dir = './T1_NIFTI_Batched' # 修改基础目标目录名

# 5. 每个批次子文件夹中最多存放的文件数量
batch_size = 100

# 6. 您的参与者表格文件的分隔符 (CSV 通常用逗号 ',', TSV 通常用制表符 '\t')
file_delimiter=',' # 默认使用逗号
# file_delimiter='\t' # 如果是制表符分隔,请取消本行注释并注释掉上面一行
# --- 配置结束 ---
# --- 脚本主要逻辑 ---
# 尝试创建基础目标目录,如果它不存在的话
try:
os.makedirs(destination_base_dir, exist_ok=True)
print(f"基础目标目录已创建或已存在: {destination_base_dir}")
except OSError as e:
print(f"错误:无法创建基础目标目录 {destination_base_dir}: {e}", file=sys.stderr)
sys.exit(1) # 退出脚本

# 读取参与者表格,获取目标 .zip 文件列表
try:
print(f"正在读取参与者表格: {participant_table_path}")
df = pd.read_csv(participant_table_path, delimiter=file_delimiter, dtype=str)
if filename_column not in df.columns:
print(f"错误: 在文件 {participant_table_path} 中未找到列 '{filename_column}'", file=sys.stderr)
print(f"文件中包含的列有: {list(df.columns)}", file=sys.stderr)
print("请检查并更新脚本中的 'filename_column' 变量。", file=sys.stderr)
sys.exit(1)
target_files = set(df[filename_column].dropna().unique())
print(f"从表格中找到 {len(target_files)} 个唯一的目标 .zip 文件名。")
if not target_files:
print("警告:在指定列中未找到任何目标文件名。请检查列名和文件内容。")
sys.exit(0)
except FileNotFoundError:
print(f"错误: 参与者表格文件未找到,路径: {participant_table_path}", file=sys.stderr)
print("请检查并更新脚本中的 'participant_table_path' 变量。", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"错误:读取参与者表格 {participant_table_path} 时出错: {e}", file=sys.stderr)
print(f"请检查您的文件分隔符 '{file_delimiter}' 是否设置正确。", file=sys.stderr)
sys.exit(1)

# 用于跟踪已成功复制的文件数量,以确定批次
files_copied_count = 0
# 用于存储在源目录中实际找到并已处理的目标文件名,防止重复处理
files_processed = set()

# 遍历源基础目录及其所有子目录
print(f"\n正在搜索目标文件于: {source_base_dir} 及其子目录...")
try:
# 为了让批次稍微稳定些(虽然不保证全局有序),可以先收集所有找到的文件路径
found_file_paths = {} # 存储找到的文件名及其完整路径
for root, dirs, files_in_current_dir in os.walk(source_base_dir):
for file_name in files_in_current_dir:
if file_name in target_files:
# 如果同一个文件名在不同地方找到,保留第一个找到的路径(或根据需要选择逻辑)
if file_name not in found_file_paths:
found_file_paths[file_name] = os.path.join(root, file_name)

print(f"在源目录中初步定位到 {len(found_file_paths)} 个目标文件。开始复制并分批...")

# 按文件名排序(可选,为了让处理顺序更确定)
sorted_target_files_found = sorted(list(found_file_paths.keys()))

# 遍历找到的目标文件进行复制和分批
for filename_to_copy in sorted_target_files_found:
# --- 分批逻辑开始 ---
# 计算当前文件应该属于哪个批次 (从 1 开始计数)
# 使用 files_copied_count 来决定批次号
batch_index = (files_copied_count // batch_size) + 1
# 构建当前批次的子目录名称 (例如 batch_001, batch_002),使用 3 位补零
current_batch_subdir_name = f"batch_{batch_index:03d}"
# 构建当前批次的完整目录路径
current_batch_dir = os.path.join(destination_base_dir, current_batch_subdir_name)

# 如果当前批次的子目录不存在,则创建它
os.makedirs(current_batch_dir, exist_ok=True)
# --- 分批逻辑结束 ---

# 获取源文件路径和目标文件路径(指向批次子目录)
source_path = found_file_paths[filename_to_copy]
dest_path = os.path.join(current_batch_dir, filename_to_copy) # 指向子目录

# 尝试复制文件
try:
print(f" 目标文件: {filename_to_copy}")
print(f" 归入批次: {current_batch_subdir_name}")
# print(f" 源路径: {source_path}") # 可取消注释以打印源路径
# print(f" 目标路径: {dest_path}") # 可取消注释以打印目标路径
print(f" 正在复制...")
shutil.copy2(source_path, dest_path) # copy2 保留元数据
files_copied_count += 1 # 只有成功复制后才增加计数器
files_processed.add(filename_to_copy) # 标记为已处理
print(f" 成功复制 {filename_to_copy}{current_batch_subdir_name}")
except Exception as e:
print(f" 错误:复制文件 {filename_to_copy} 时出错: {e}", file=sys.stderr)
# 如果复制失败,可以选择是否继续处理下一个文件,目前是继续

except Exception as e:
print(f"在目录遍历或文件处理过程中发生严重错误: {e}", file=sys.stderr)

压缩

1
2
3
4
zip -r 001.zip batch_001/

for dir in batch_???/; do [ -d "$dir" ] && base=$(basename "$dir") && num=${base#batch_} && zip -r "${num}.zip" "$dir"; done
#请在包含所有 batch_XXX 文件夹的父目录下,直接运行上面这条单行命令。 它会自动查找所有 batch_XXX/ 目录,并为每一个目录执行正确的 zip -r XXX.zip batch_XXX/ 命令。

UKB数据提取
https://xingdayup.github.io/2025/05/07/UKB_extraction/
Author
Jesse Chen
Posted on
May 7, 2025
Licensed under