feat: finetune 2.7M

This commit is contained in:
zin
2026-06-02 14:42:39 +00:00
parent c631c5649a
commit 9ce92b70a9
6 changed files with 1864 additions and 51 deletions
+314
View File
@@ -0,0 +1,314 @@
import os
import gc
import pickle
import random
from pathlib import Path
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
import torchvision.io as tv_io
from torch.amp import autocast, GradScaler
from tqdm import tqdm
import timm
# ==========================================
# 1. КОНФИГУРАЦИЯ И ПУТИ
# ==========================================
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Используем устройство: {DEVICE}")
# Путь к огромному датасету на NFS
DATA_ROOT = Path("/home/zin/projects/Thesis/dataset/Original-2.41M")
CACHE_PATH = Path("/home/zin/projects/Thesis/src/dataset_paths_cache.pkl")
# Пути к моделям
PREVIOUS_WEIGHTS = Path("/home/zin/projects/Thesis/src/emoset_resnet50_best.pth") # Старые веса (118K)
RESUME_CHECKPOINT = Path("/home/zin/projects/Thesis/src/emoset_resnet50_resume.pth") # Файл для восстановления сессии
SAVE_MODEL_PATH = Path("/home/zin/projects/Thesis/src/emoset_resnet50_finetuned_2.41M.pth") # Финальный файл
EMO_MAP = {
"amusement": 0, "anger": 1, "awe": 2, "contentment": 3,
"disgust": 4, "excitement": 5, "fear": 6, "sad": 7, "sadness": 7
}
# --- НАСТРОЙКИ ОБУЧЕНИЯ ---
BATCH_SIZE = 82
EPOCHS = 15
LR = 5e-5 # Низкий LR, так как мы делаем Fine-Tuning
NUM_TRAIN_WORKERS = 48
NUM_VAL_WORKERS = 18
# Настройки защиты от переобучения
PATIENCE = 4
best_val_loss = float('inf')
epochs_no_improve = 0
start_epoch = 1
# ==========================================
# 2. ПОДГОТОВКА ДАННЫХ (БЫСТРЫЙ КЭШ)
# ==========================================
if CACHE_PATH.exists():
print(f"Загрузка списка файлов из кэша: {CACHE_PATH}...")
with open(CACHE_PATH, 'rb') as f:
cache_data = pickle.load(f)
all_paths = cache_data['image_paths']
all_labels = cache_data['labels']
print(f"Готово! Моментально загружено {len(all_paths)} путей.")
else:
print(f"Сканирование NFS директории {DATA_ROOT} (Выполняется один раз)...")
all_paths, all_labels = [], []
for img_path in DATA_ROOT.rglob('*.jpg'):
emotion_folder = img_path.parts[-3].lower()
if emotion_folder in EMO_MAP:
all_paths.append(str(img_path))
all_labels.append(EMO_MAP[emotion_folder])
with open(CACHE_PATH, 'wb') as f:
pickle.dump({'image_paths': all_paths, 'labels': all_labels}, f)
print(f"Сохранено в кэш: {len(all_paths)} изображений.")
# Разделение на Train / Validation (95% / 5%)
random.seed(42) # Фиксируем сид, чтобы при перезапусках сплит не менялся
combined = list(zip(all_paths, all_labels))
random.shuffle(combined)
all_paths, all_labels = zip(*combined)
split_idx = int(len(all_paths) * 0.95)
train_paths, train_labels = all_paths[:split_idx], all_labels[:split_idx]
val_paths, val_labels = all_paths[split_idx:], all_labels[split_idx:]
print(f"Трейн: {len(train_paths)} | Валидация: {len(val_paths)}")
# ==========================================
# 3. DATASET & DATALOADER
# ==========================================
class EmoSetDirectDataset(Dataset):
def __init__(self, image_paths, labels):
self.image_paths = image_paths
self.labels = labels
# На процессоре делаем только базовый ресайз
self.base_transform = T.Resize((256, 256), antialias=True)
def __len__(self): return len(self.image_paths)
def __getitem__(self, idx):
try:
image = tv_io.read_image(self.image_paths[idx], mode=tv_io.ImageReadMode.RGB)
image = image.to(torch.float32) / 255.0
image = self.base_transform(image)
except Exception:
# Отказоустойчивость для битых файлов из интернета
image = torch.zeros((3, 256, 256), dtype=torch.float32)
return image, self.labels[idx]
# --- ИСПРАВЛЕННЫЕ ЗАГРУЗЧИКИ С PREFETCH ---
train_loader = DataLoader(
EmoSetDirectDataset(train_paths, train_labels),
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=NUM_TRAIN_WORKERS,
pin_memory=True,
prefetch_factor=2,
persistent_workers=True
)
val_loader = DataLoader(
EmoSetDirectDataset(val_paths, val_labels),
batch_size=BATCH_SIZE,
shuffle=False,
num_workers=NUM_VAL_WORKERS,
pin_memory=True,
prefetch_factor=2,
persistent_workers=True
)
# ==========================================
# 4. АУГМЕНТАЦИИ НА GPU (СУПЕР СКОРОСТЬ)
# ==========================================
gpu_train_transforms = torch.nn.Sequential(
T.RandomCrop((224, 224)),
T.RandomHorizontalFlip(p=0.5),
T.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
).to(DEVICE)
gpu_val_transforms = torch.nn.Sequential(
T.CenterCrop((224, 224)),
T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
).to(DEVICE)
# ==========================================
# 5. ИНИЦИАЛИЗАЦИЯ МОДЕЛИ
# ==========================================
print("\nСоздание архитектуры ResNet-50...")
model = timm.create_model('resnet50', pretrained=False, num_classes=8).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)
scaler = GradScaler()
# --- ЛОГИКА БЕСШОВНОГО ВОССТАНОВЛЕНИЯ ---
if RESUME_CHECKPOINT.exists():
print(f"ВОССТАНОВЛЕНИЕ СЕССИИ из: {RESUME_CHECKPOINT}")
checkpoint = torch.load(RESUME_CHECKPOINT, map_location=DEVICE)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
if 'scaler_state_dict' in checkpoint:
scaler.load_state_dict(checkpoint['scaler_state_dict'])
if 'best_val_loss' in checkpoint:
best_val_loss = checkpoint['best_val_loss']
start_epoch = checkpoint['epoch'] + 1
print(f"УСПЕХ: Продолжаем обучение с эпохи {start_epoch}")
else:
print("Чекпоинт сессии не найден. Проверяем наличие базовых весов...")
if PREVIOUS_WEIGHTS.exists():
print(f"📥 Загрузка базовых весов (от 118K): {PREVIOUS_WEIGHTS}")
model.load_state_dict(torch.load(PREVIOUS_WEIGHTS, map_location=DEVICE))
else:
print("ВНИМАНИЕ: Базовые веса не найдены. Обучение начнется с нуля (ImageNet).")
model = timm.create_model('resnet50', pretrained=True, num_classes=8).to(DEVICE)
# ==========================================
# 6. ГЛАВНЫЙ ЦИКЛ ОБУЧЕНИЯ
# ==========================================
if start_epoch > EPOCHS:
print(f"\nОбучение уже было завершено (цель: {EPOCHS} эпох).")
else:
print(f"\nСтарт обучения. Целевое количество эпох: {EPOCHS}")
try:
for epoch in range(start_epoch, EPOCHS + 1):
# --- ФАЗА 1: ТРЕНИРОВКА ---
model.train()
running_loss, correct, total = 0.0, 0, 0
pbar = tqdm(train_loader, desc=f"Эпоха {epoch}/{EPOCHS} [Тренировка]")
for inputs, labels in pbar:
try:
# Перенос на GPU и применение быстрых аугментаций
inputs = inputs.to(DEVICE, non_blocking=True)
labels = labels.to(DEVICE, non_blocking=True)
inputs = gpu_train_transforms(inputs)
optimizer.zero_grad()
# Смешанная точность (AMP) для экономии VRAM и ускорения
with autocast(device_type="cuda"):
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
running_loss += loss.item() * inputs.size(0)
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
pbar.set_postfix({'loss': f"{loss.item():.4f}", 'acc': f"{correct/total:.4f}"})
except RuntimeError as e:
# Хендлер нехватки памяти (OOM)
if "out of memory" in str(e).lower():
print(f"\nВНИМАНИЕ: Нехватка VRAM! Очистка...")
if 'outputs' in locals(): del outputs
if 'loss' in locals(): del loss
torch.cuda.empty_cache()
optimizer.zero_grad()
continue
raise e
train_loss = running_loss / total if total > 0 else 0
train_acc = correct / total if total > 0 else 0
# Очистка мусора перед валидацией
gc.collect()
torch.cuda.empty_cache()
# --- ФАЗА 2: ВАЛИДАЦИЯ ---
model.eval()
val_loss, val_correct, val_total = 0.0, 0, 0
with torch.no_grad():
for val_inputs, val_labels in tqdm(val_loader, desc=f"Эпоха {epoch}/{EPOCHS} [Валидация]", leave=False):
val_inputs, val_labels = val_inputs.to(DEVICE), val_labels.to(DEVICE)
val_inputs = gpu_val_transforms(val_inputs)
with autocast(device_type="cuda"):
val_outputs = model(val_inputs)
v_loss = criterion(val_outputs, val_labels)
val_loss += v_loss.item() * val_inputs.size(0)
_, val_predicted = val_outputs.max(1)
val_total += val_labels.size(0)
val_correct += val_predicted.eq(val_labels).sum().item()
epoch_val_loss = val_loss / val_total if val_total > 0 else 0
epoch_val_acc = val_correct / val_total if val_total > 0 else 0
scheduler.step()
print(f"🏁 Эпоха {epoch} | Train Loss: {train_loss:.4f} (Acc: {train_acc:.4f}) | Val Loss: {epoch_val_loss:.4f} (Acc: {epoch_val_acc:.4f})")
# --- ФАЗА 3: EARLY STOPPING И СОХРАНЕНИЕ ---
if epoch_val_loss < best_val_loss:
best_val_loss = epoch_val_loss
epochs_no_improve = 0
torch.save(model.state_dict(), "../emoset_resnet50_best_2_41M.pth")
print("Новая лучшая модель найдена! Веса сохранены.")
else:
epochs_no_improve += 1
print(f"Валидация не улучшается {epochs_no_improve}/{PATIENCE} эпох.")
if epochs_no_improve >= PATIENCE and epoch >= 15: # Даем модели минимум 15 эпох на раскачку
print("\nСРАБОТАЛА ЗАЩИТА ОТ ПЕРЕОБУЧЕНИЯ (Early Stopping)!")
break
# Сохранение полного состояния сессии
checkpoint_state = {
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict(),
'scaler_state_dict': scaler.state_dict(),
'best_val_loss': best_val_loss
}
torch.save(checkpoint_state, RESUME_CHECKPOINT)
# Сохранение весов конкретной эпохи как бэкап
torch.save(model.state_dict(), f"../emoset_resnet50_finetuned_ep{epoch}.pth")
gc.collect()
torch.cuda.empty_cache()
# ==========================================
# 7. ПЕРЕХВАТ РУЧНОЙ ОСТАНОВКИ (CTRL+C)
# ==========================================
except KeyboardInterrupt:
print("\n\nОБУЧЕНИЕ ПРЕРВАНО ВРУЧНУЮ (KeyboardInterrupt)!")
print(f"Экстренное сохранение состояния конвейера на эпохе {epoch}...")
checkpoint_state = {
'epoch': epoch, 'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict(), 'scaler_state_dict': scaler.state_dict(),
'best_val_loss': best_val_loss
}
torch.save(checkpoint_state, RESUME_CHECKPOINT)
interrupted_weights_path = f"../emoset_resnet50_interrupted_ep{epoch}.pth"
torch.save(model.state_dict(), interrupted_weights_path)
print(f"Прогресс безопасно зафиксирован в файле {interrupted_weights_path}. Выходим.")
# ==========================================
# 8. ФИНАЛЬНОЕ СОХРАНЕНИЕ
# ==========================================
else:
if SAVE_MODEL_PATH.parent.exists():
torch.save(model.state_dict(), SAVE_MODEL_PATH)
print(f"\nОБУЧЕНИЕ УСПЕШНО ЗАВЕРШЕНО! Финальная модель: {SAVE_MODEL_PATH}")
if RESUME_CHECKPOINT.exists():
RESUME_CHECKPOINT.unlink() # Удаляем resume файл за ненадобностью
+467
View File
@@ -0,0 +1,467 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "71ef58af",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Используем устройство: cuda\n"
]
}
],
"source": [
"import os\n",
"import torch\n",
"import torch.nn as nn\n",
"from torch.utils.data import Dataset, DataLoader\n",
"import torchvision.transforms as T\n",
"import pandas as pd\n",
"from pathlib import Path\n",
"from PIL import Image\n",
"from tqdm.notebook import tqdm\n",
"import timm\n",
"\n",
"# Проверяем GPU\n",
"DEVICE = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n",
"print(f\"Используем устройство: {DEVICE}\")"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "f4ae931c",
"metadata": {},
"outputs": [],
"source": [
"# === НАСТРОЙКИ ДООБУЧЕНИЯ ===\n",
"\n",
"# Абсолютный путь к смонтированному NFS\n",
"DATA_ROOT = Path(\"/home/zin/projects/Thesis/NFS/Thesis/Emoset/Original-2.41M\")\n",
"\n",
"# Пути относительно src/scripts/\n",
"PREVIOUS_WEIGHTS = Path(\"../emoset_resnet50_best.pth\")\n",
"SAVE_MODEL_PATH = Path(\"../emoset_resnet50_finetuned_2.41M.pth\")\n",
"\n",
"# Маппинг эмоций в те же индексы (0-7), которые использовались при первоначальном обучении\n",
"EMO_MAP = {\n",
" \"amusement\": 0,\n",
" \"anger\": 1,\n",
" \"awe\": 2,\n",
" \"contentment\": 3,\n",
" \"disgust\": 4,\n",
" \"excitement\": 5,\n",
" \"fear\": 6,\n",
" \"sad\": 7, # В твоем сообщении папка называется \"sad\"\n",
" \"sadness\": 7 # На всякий случай оставляем и классическое название\n",
"}\n",
"\n",
"BATCH_SIZE = 96\n",
"EPOCHS = 15\n",
"LR = 5e-5\n",
"NUM_WORKERS = 42"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "934cfe2c",
"metadata": {},
"outputs": [],
"source": [
"import pickle\n",
"import os\n",
"import torchvision.io as tv_io\n",
"\n",
"# Твои трансформации (только Resize, как мы сделали ранее)\n",
"train_transforms = T.Compose([\n",
" T.Resize((256, 256), antialias=True)\n",
"])\n",
"\n",
"class EmoSetNestedDataset(Dataset):\n",
" def __init__(self, root_dir, transform=None, cache_file=\"../dataset_cache_2.41M.pkl\"):\n",
" self.root_dir = Path(root_dir)\n",
" self.transform = transform\n",
" self.image_paths = []\n",
" self.labels = []\n",
" \n",
" # === ЛОГИКА КЭШИРОВАНИЯ ===\n",
" if os.path.exists(cache_file):\n",
" print(f\"📦 Загрузка списка файлов из локального кэша: {cache_file}...\")\n",
" with open(cache_file, 'rb') as f:\n",
" cache_data = pickle.load(f)\n",
" self.image_paths = cache_data['image_paths']\n",
" self.labels = cache_data['labels']\n",
" print(f\"⚡ Готово! Моментально загружено {len(self.image_paths)} путей.\")\n",
" else:\n",
" print(f\"🔍 Сканирование NFS директории {self.root_dir}...\")\n",
" print(\"Это займет около 8-10 минут. Выполняется один раз...\")\n",
" \n",
" for img_path in self.root_dir.rglob('*.jpg'):\n",
" emotion_folder = img_path.parts[-3].lower()\n",
" if emotion_folder in EMO_MAP:\n",
" self.image_paths.append(str(img_path))\n",
" self.labels.append(EMO_MAP[emotion_folder])\n",
" \n",
" print(f\"💾 Сохранение результатов сканирования в кэш: {cache_file}...\")\n",
" with open(cache_file, 'wb') as f:\n",
" pickle.dump({'image_paths': self.image_paths, 'labels': self.labels}, f)\n",
" \n",
" print(f\"✅ Успешно найдено и закэшировано {len(self.image_paths)} изображений.\")\n",
"\n",
" def __len__(self):\n",
" return len(self.image_paths)\n",
"\n",
" def __getitem__(self, idx):\n",
" img_path = self.image_paths[idx]\n",
" label = self.labels[idx]\n",
" \n",
" try:\n",
" image = tv_io.read_image(str(img_path), mode=tv_io.ImageReadMode.RGB)\n",
" image = image.to(torch.float32) / 255.0\n",
" except Exception as e:\n",
" image = torch.zeros((3, 256, 256), dtype=torch.float32)\n",
" \n",
" if self.transform:\n",
" image = self.transform(image)\n",
" \n",
" return image, label"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "b10adc06",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"📦 Загрузка списка файлов из локального кэша: ../dataset_paths_cache.pkl...\n",
"⚡ Готово! Моментально загружено 2048377 путей.\n",
"Батчей за одну эпоху: 21338\n",
"\n",
"Создание архитектуры ResNet50...\n",
"📝 Чекпоинт прерванной сессии не найден. Проверяем базовые веса...\n",
"УСПЕХ: Найдены предыдущие веса '../emoset_resnet50_best.pth' (из EmoSet-118K). Загружаем...\n"
]
}
],
"source": [
"# Путь к кэш-файлу (лучше положить его в src, рядом со скриптами, чтобы быстро читался)\n",
"CACHE_PATH = Path(\"../dataset_paths_cache.pkl\")\n",
"\n",
"# 1. Загрузка данных напрямую из папок (или из кэша!)\n",
"train_dataset = EmoSetNestedDataset(DATA_ROOT, transform=train_transforms, cache_file=CACHE_PATH)\n",
"\n",
"train_loader = DataLoader(\n",
" train_dataset, \n",
" batch_size=BATCH_SIZE, \n",
" shuffle=True, \n",
" num_workers=NUM_WORKERS,\n",
" pin_memory=True,\n",
" prefetch_factor=1,\n",
" persistent_workers=True\n",
")\n",
"\n",
"print(f\"Батчей за одну эпоху: {len(train_loader)}\")\n",
"\n",
"# Путь к файлу автоматического восстановления (чекпоинт полной сессии)\n",
"RESUME_CHECKPOINT = Path(\"../emoset_resnet50_checkpoint_latest.pth\")\n",
"\n",
"# 2. Инициализация архитектуры модели ResNet-50\n",
"print(\"\\nСоздание архитектуры ResNet50...\")\n",
"model = timm.create_model('resnet50', pretrained=False, num_classes=8)\n",
"model = model.to(DEVICE)\n",
"\n",
"# Инициализируем компоненты оптимизации\n",
"criterion = nn.CrossEntropyLoss()\n",
"optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)\n",
"scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)\n",
"\n",
"# По умолчанию начинаем с 1-й эпохи\n",
"start_epoch = 1\n",
"\n",
"# === ЛОГИКА ВОССТАНОВЛЕНИЯ / СТАРТА ===\n",
"if RESUME_CHECKPOINT.exists():\n",
" print(f\"🔄 ОБНАРУЖЕН ДЕЙСТВУЮЩИЙ ЧЕКПОИНТ: '{RESUME_CHECKPOINT}'\")\n",
" print(\"Восстанавливаем полное состояние сессии...\")\n",
" \n",
" # Загружаем сохраненный словарь состояния\n",
" checkpoint = torch.load(RESUME_CHECKPOINT, map_location=DEVICE)\n",
" \n",
" # Восстанавливаем всё до единого\n",
" model.load_state_dict(checkpoint['model_state_dict'])\n",
" optimizer.load_state_dict(checkpoint['optimizer_state_dict'])\n",
" scheduler.load_state_dict(checkpoint['scheduler_state_dict'])\n",
" start_epoch = checkpoint['epoch'] + 1 # Начинаем со следующей эпохи\n",
" \n",
" print(f\"🚀 УСПЕХ: Сессия восстановлена! Продолжаем обучение с эпохи {start_epoch}\")\n",
"else:\n",
" print(\"📝 Чекпоинт прерванной сессии не найден. Проверяем базовые веса...\")\n",
" if PREVIOUS_WEIGHTS.exists():\n",
" print(f\"УСПЕХ: Найдены предыдущие веса '{PREVIOUS_WEIGHTS}' (из EmoSet-118K). Загружаем...\")\n",
" model.load_state_dict(torch.load(PREVIOUS_WEIGHTS, map_location=DEVICE))\n",
" else:\n",
" print(\"ВНИМАНИЕ: Базовые веса не найдены. Начинаем обучение с нуля (ImageNet pretrained).\")\n",
" model = timm.create_model('resnet50', pretrained=True, num_classes=8).to(DEVICE)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "a7480834",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"⏰ Старт обучения. Целевое количество эпох: 15\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Эпоха 1/15 [Тренировка]: 0%| | 0/21338 [00:00<?, ?it/s]"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Эпоха 1/15 [Тренировка]: 13%|█▎ | 2705/21338 [32:27<2:39:51, 1.94it/s, loss=1.6873, acc=0.3036] Corrupt JPEG data: 80 extraneous bytes before marker 0xd9\n",
"Эпоха 1/15 [Тренировка]: 17%|█▋ | 3623/21338 [43:15<4:46:57, 1.03it/s, loss=1.7141, acc=0.3102] Invalid SOS parameters for sequential JPEG\n",
"Эпоха 1/15 [Тренировка]: 18%|█▊ | 3741/21338 [44:35<2:23:26, 2.04it/s, loss=1.7848, acc=0.3109] Invalid SOS parameters for sequential JPEG\n",
"Эпоха 1/15 [Тренировка]: 19%|█▉ | 4109/21338 [49:12<2:17:42, 2.09it/s, loss=1.8072, acc=0.3133] Corrupt JPEG data: 485 extraneous bytes before marker 0xd9\n",
"Эпоха 1/15 [Тренировка]: 22%|██▏ | 4729/21338 [56:22<3:06:07, 1.49it/s, loss=1.6499, acc=0.3173] Invalid SOS parameters for sequential JPEG\n",
"Эпоха 1/15 [Тренировка]: 38%|███▊ | 8033/21338 [1:35:20<2:21:51, 1.56it/s, loss=1.5897, acc=0.3338] Corrupt JPEG data: 41 extraneous bytes before marker 0xd9\n",
"Эпоха 1/15 [Тренировка]: 45%|████▌ | 9684/21338 [1:54:34<3:40:06, 1.13s/it, loss=1.6740, acc=0.3399] Invalid SOS parameters for sequential JPEG\n",
"Эпоха 1/15 [Тренировка]: 50%|█████ | 10679/21338 [2:06:15<47:11, 3.76it/s, loss=1.6234, acc=0.3431] Invalid SOS parameters for sequential JPEG\n",
"Эпоха 1/15 [Тренировка]: 55%|█████▌ | 11802/21338 [2:19:39<1:50:22, 1.44it/s, loss=1.5677, acc=0.3463] Unknown Adobe color transform code 2\n",
"Эпоха 1/15 [Тренировка]: 67%|██████▋ | 14253/21338 [2:48:12<27:27, 4.30it/s, loss=1.7579, acc=0.3525] Invalid SOS parameters for sequential JPEG\n",
"Эпоха 1/15 [Тренировка]: 77%|███████▋ | 16377/21338 [3:13:17<1:06:23, 1.25it/s, loss=1.6855, acc=0.3572] Invalid SOS parameters for sequential JPEG\n",
"Эпоха 1/15 [Тренировка]: 92%|█████████▏| 19575/21338 [3:51:14<11:13, 2.62it/s, loss=1.5876, acc=0.3631] Invalid SOS parameters for sequential JPEG\n",
"Эпоха 1/15 [Тренировка]: 92%|█████████▏| 19679/21338 [3:52:26<07:42, 3.59it/s, loss=1.7134, acc=0.3633] Invalid SOS parameters for sequential JPEG\n",
"Эпоха 1/15 [Тренировка]: 100%|█████████▉| 21283/21338 [4:11:18<00:20, 2.70it/s, loss=1.4613, acc=0.3658] Unknown Adobe color transform code 2\n",
"Эпоха 1/15 [Тренировка]: 100%|██████████| 21338/21338 [4:11:47<00:00, 1.41it/s, loss=1.6304, acc=0.3659]\n"
]
},
{
"ename": "NameError",
"evalue": "name 'val_loader' is not defined",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mNameError\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[5]\u001b[39m\u001b[32m, line 88\u001b[39m\n\u001b[32m 85\u001b[39m \u001b[38;5;66;03m# ВАЖНО: Если у тебя нет val_loader, создай его (откуси 5-10% от датасета)\u001b[39;00m\n\u001b[32m 86\u001b[39m \u001b[38;5;66;03m# На валидации мы НЕ применяем gpu_transforms (только нормализацию)\u001b[39;00m\n\u001b[32m 87\u001b[39m \u001b[38;5;28;01mwith\u001b[39;00m torch.no_grad():\n\u001b[32m---> \u001b[39m\u001b[32m88\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m val_inputs, val_labels \u001b[38;5;129;01min\u001b[39;00m \u001b[43mval_loader\u001b[49m:\n\u001b[32m 89\u001b[39m val_inputs, val_labels = val_inputs.to(DEVICE), val_labels.to(DEVICE)\n\u001b[32m 91\u001b[39m \u001b[38;5;66;03m# Валидация тоже идет в смешанной точности для скорости\u001b[39;00m\n",
"\u001b[31mNameError\u001b[39m: name 'val_loader' is not defined"
]
}
],
"source": [
"import gc\n",
"import torch\n",
"from torch.amp import autocast, GradScaler\n",
"from tqdm import tqdm\n",
"import torchvision.transforms as T\n",
"\n",
"# --- НАСТРОЙКИ EARLY STOPPING ---\n",
"PATIENCE = 4 # Сколько эпох ждем, если валидация не улучшается\n",
"best_val_loss = float('inf')\n",
"epochs_no_improve = 0\n",
"start_epoch = 1\n",
"\n",
"# --- ПЕРЕНОСИМ ТЯЖЕЛЫЕ АУГМЕНТАЦИИ НА GPU ---\n",
"gpu_transforms = torch.nn.Sequential(\n",
" T.RandomCrop((224, 224)),\n",
" T.RandomHorizontalFlip(p=0.5),\n",
" T.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),\n",
" T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])\n",
").to(DEVICE)\n",
"# -------------------------------------------\n",
"\n",
"print(f\"⏰ Старт обучения. Целевое количество эпох: {EPOCHS}\")\n",
"scaler = GradScaler()\n",
"\n",
"try:\n",
" for epoch in range(start_epoch, EPOCHS + 1):\n",
" # ==========================================\n",
" # 1. ТРЕНИРОВКА (TRAIN)\n",
" # ==========================================\n",
" model.train()\n",
" running_loss = 0.0\n",
" correct = 0\n",
" total = 0\n",
" \n",
" pbar = tqdm(train_loader, desc=f\"Эпоха {epoch}/{EPOCHS} [Тренировка]\")\n",
" for inputs, labels in pbar:\n",
" try:\n",
" inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)\n",
" \n",
" # Применяем аугментации на лету к батчу (на GPU)\n",
" inputs = gpu_transforms(inputs)\n",
" \n",
" optimizer.zero_grad()\n",
" \n",
" with autocast(device_type=\"cuda\"): # Для PyTorch >= 2.0 лучше указывать \"cuda\"\n",
" outputs = model(inputs)\n",
" loss = criterion(outputs, labels)\n",
" \n",
" scaler.scale(loss).backward()\n",
" scaler.step(optimizer)\n",
" scaler.update()\n",
" \n",
" # Статистика\n",
" running_loss += loss.item() * inputs.size(0)\n",
" _, predicted = outputs.max(1)\n",
" total += labels.size(0)\n",
" correct += predicted.eq(labels).sum().item()\n",
" \n",
" pbar.set_postfix({'loss': f\"{loss.item():.4f}\", 'acc': f\"{correct/total:.4f}\"})\n",
" \n",
" except RuntimeError as e:\n",
" # Обработчик OOM\n",
" if \"out of memory\" in str(e).lower():\n",
" print(f\"\\n⚠️ ВНИМАНИЕ: Нехватка VRAM на батче! Выполняем экстренную очистку...\")\n",
" if 'outputs' in locals(): del outputs\n",
" if 'loss' in locals(): del loss\n",
" torch.cuda.empty_cache()\n",
" optimizer.zero_grad()\n",
" print(\"♻️ Кэш очищен. Батч пропущен. Продолжаем обучение...\")\n",
" continue\n",
" else:\n",
" raise e\n",
" \n",
" epoch_loss = running_loss / total if total > 0 else 0\n",
" epoch_acc = correct / total if total > 0 else 0\n",
" \n",
" # ==========================================\n",
" # 2. ВАЛИДАЦИЯ И EARLY STOPPING\n",
" # ==========================================\n",
" model.eval()\n",
" val_loss = 0.0\n",
" val_correct = 0\n",
" val_total = 0\n",
" \n",
" # ВАЖНО: Если у тебя нет val_loader, создай его (откуси 5-10% от датасета)\n",
" # На валидации мы НЕ применяем gpu_transforms (только нормализацию)\n",
" with torch.no_grad():\n",
" for val_inputs, val_labels in val_loader:\n",
" val_inputs, val_labels = val_inputs.to(DEVICE), val_labels.to(DEVICE)\n",
" \n",
" # Валидация тоже идет в смешанной точности для скорости\n",
" with autocast(device_type=\"cuda\"):\n",
" val_outputs = model(val_inputs)\n",
" v_loss = criterion(val_outputs, val_labels)\n",
" \n",
" val_loss += v_loss.item() * val_inputs.size(0)\n",
" _, val_predicted = val_outputs.max(1)\n",
" val_total += val_labels.size(0)\n",
" val_correct += val_predicted.eq(val_labels).sum().item()\n",
" \n",
" epoch_val_loss = val_loss / val_total if val_total > 0 else 0\n",
" epoch_val_acc = val_correct / val_total if val_total > 0 else 0\n",
" \n",
" scheduler.step()\n",
" print(f\"🏁 Эпоха {epoch} завершена | Train Loss: {epoch_loss:.4f} (Acc: {epoch_acc:.4f}) | Val Loss: {epoch_val_loss:.4f} (Acc: {epoch_val_acc:.4f})\")\n",
" \n",
" # --- ЛОГИКА РАННЕЙ ОСТАНОВКИ ---\n",
" if epoch_val_loss < best_val_loss:\n",
" best_val_loss = epoch_val_loss\n",
" epochs_no_improve = 0\n",
" # Сохраняем идеальные веса, пока сеть не переобучилась\n",
" torch.save(model.state_dict(), \"../emoset_resnet50_best.pth\")\n",
" print(\"🌟 Найдена лучшая модель! Веса сохранены.\")\n",
" else:\n",
" epochs_no_improve += 1\n",
" print(f\"⚠️ Валидационный Loss не улучшается {epochs_no_improve}/{PATIENCE} эпох.\")\n",
" \n",
" # Условие: если переобучение длится долго И мы прошли хотя бы 15 эпох\n",
" if epochs_no_improve >= PATIENCE and epoch >= 15:\n",
" print(\"\\n🛑 СРАБОТАЛА ЗАЩИТА ОТ ПЕРЕОБУЧЕНИЯ (Early Stopping)!\")\n",
" print(\"Модель начала запоминать данные вместо обобщения. Обучение досрочно завершено.\")\n",
" break # Прерываем цикл эпох\n",
" \n",
" # ==========================================\n",
" # 3. РЕГУЛЯРНОЕ СОХРАНЕНИЕ ПРОГРЕССА\n",
" # ==========================================\n",
" checkpoint_state = {\n",
" 'epoch': epoch,\n",
" 'model_state_dict': model.state_dict(),\n",
" 'optimizer_state_dict': optimizer.state_dict(),\n",
" 'scheduler_state_dict': scheduler.state_dict(),\n",
" 'scaler_state_dict': scaler.state_dict(),\n",
" 'loss': epoch_loss,\n",
" 'val_loss': epoch_val_loss\n",
" }\n",
" torch.save(checkpoint_state, RESUME_CHECKPOINT)\n",
" \n",
" # Сохранение весов конкретной эпохи (на всякий случай)\n",
" epoch_weights_path = f\"../emoset_resnet50_finetuned_ep{epoch}.pth\"\n",
" torch.save(model.state_dict(), epoch_weights_path)\n",
" \n",
" gc.collect()\n",
" torch.cuda.empty_cache()\n",
"\n",
"# ==========================================\n",
"# 4. БЕЗОПАСНЫЙ ВЫХОД ПРИ РУЧНОМ ПРЕРЫВАНИИ\n",
"# ==========================================\n",
"except KeyboardInterrupt:\n",
" print(\"\\n\\n🛑 ОБУЧЕНИЕ ПРЕРВАНО ВРУЧНУЮ (KeyboardInterrupt)!\")\n",
" print(f\"💾 Экстренное сохранение состояния конвейера на эпохе {epoch}...\")\n",
" \n",
" # Сохраняем всё, чтобы потом можно было продолжить с этого же места\n",
" checkpoint_state = {\n",
" 'epoch': epoch,\n",
" 'model_state_dict': model.state_dict(),\n",
" 'optimizer_state_dict': optimizer.state_dict(),\n",
" 'scheduler_state_dict': scheduler.state_dict(),\n",
" 'scaler_state_dict': scaler.state_dict()\n",
" }\n",
" torch.save(checkpoint_state, RESUME_CHECKPOINT)\n",
" \n",
" # Сохраняем промежуточные веса на момент остановки\n",
" interrupted_weights_path = f\"../emoset_resnet50_interrupted_ep{epoch}.pth\"\n",
" torch.save(model.state_dict(), interrupted_weights_path)\n",
" \n",
" print(f\"✅ Прогресс безопасно зафиксирован в файле {interrupted_weights_path}. Выходим.\")\n",
"\n",
"# Финальное сохранение (если цикл дошел до конца сам)\n",
"else:\n",
" if SAVE_MODEL_PATH.parent.exists():\n",
" torch.save(model.state_dict(), SAVE_MODEL_PATH)\n",
" print(f\"\\n🎉 ОБУЧЕНИЕ УСПЕШНО ЗАВЕРШЕНО! Финальная модель: {SAVE_MODEL_PATH}\")\n",
" if RESUME_CHECKPOINT.exists():\n",
" RESUME_CHECKPOINT.unlink()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python (thesis)",
"language": "python",
"name": "thesis"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
+134
View File
@@ -0,0 +1,134 @@
import os
import random
from pathlib import Path
from tqdm.notebook import tqdm
import webdataset as wds
# --- НАСТРОЙКИ ---
# Оригинальная папка с датасетом (на NFS)
DATA_ROOT = Path("/home/zin/projects/Thesis/NFS/Thesis/Emoset/Original-2.41M")
# Новая папка, куда мы сложим готовые .tar архивы (шарды)
# Лучше создать её рядом с оригинальным датасетом на NFS
SHARDS_DIR = Path("/home/zin/projects/Thesis/NFS/Thesis/Emoset/shards-2.41M")
SHARDS_DIR.mkdir(parents=True, exist_ok=True)
# Маппинг классов
EMO_MAP = {
"amusement": 0, "anger": 1, "awe": 2, "contentment": 3,
"disgust": 4, "excitement": 5, "fear": 6, "sad": 7, "sadness": 7
}
# Размер одного архива. 10 000 картинок — идеальный баланс
MAX_SAMPLES_PER_SHARD = 10000
samples = []
print(f"🔍 Сканирование директории {DATA_ROOT}...")
# Используем os.walk, он часто работает быстрее rglob на сетевых дисках
for root, dirs, files in os.walk(DATA_ROOT):
for file in files:
if file.lower().endswith('.jpg'):
full_path = os.path.join(root, file)
# Извлекаем эмоцию (зависит от структуры папок, берем предпоследнюю папку)
# Путь: .../amusement/0/image.jpg -> root_parts[-2] будет 'amusement'
path_parts = Path(full_path).parts
emotion_folder = path_parts[-3].lower()
if emotion_folder in EMO_MAP:
samples.append((full_path, EMO_MAP[emotion_folder]))
print(f"✅ Найдено изображений: {len(samples)}")
# САМЫЙ ВАЖНЫЙ ШАГ: Глобальное перемешивание перед упаковкой
print("🔀 Перемешиваем датасет...")
random.shuffle(samples)
print("✅ Перемешивание завершено!")
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
import webdataset as wds
from PIL import Image
import io
from pathlib import Path
# ВАЖНО: Импортируем базовый tqdm, а не notebook-версию.
# Notebook-версия в мультипроцессинге вызывает зависание графического интерфейса Jupyter.
from tqdm import tqdm
# --- ПУТИ И НАСТРОЙКИ ---
SHARDS_DIR = Path("../../dataset/EmoSet-2.41M-shards")
SHARDS_DIR.mkdir(parents=True, exist_ok=True)
NUM_WORKERS = 50
# 1. Дробим наш список на чанки
chunks = [samples[i:i + MAX_SAMPLES_PER_SHARD] for i in range(0, len(samples), MAX_SAMPLES_PER_SHARD)]
print(f"📦 Подготовлено {len(chunks)} задач (шардов).")
print(f"💾 Целевая папка (Локальный SSD): {SHARDS_DIR}")
print(f"🚀 Запуск упаковки и сжатия в {NUM_WORKERS} потоков...\n")
# Инициализация блокировки tqdm для мультипроцессинга (чтобы бары не съезжали)
tqdm.set_lock(mp.RLock())
# 2. Функция, которую выполняет каждый воркер
def build_shard(args):
shard_idx, chunk = args
shard_path = SHARDS_DIR / f"emoset-{shard_idx:06d}.tar"
success_count = 0
error_count = 0
# ХИТРОСТЬ ЗДЕСЬ: position = остаток от деления + 1.
# Это гарантирует, что все 42 воркера поделят между собой 42 строчки на экране,
# и когда воркер берет новый шард, он обновляет свою старую строчку, а не создает новую.
# leave=False заставит бар исчезнуть, когда чанк докачается.
worker_pos = (shard_idx % NUM_WORKERS) + 1
with wds.TarWriter(str(shard_path)) as sink:
# Рисуем прогресс-бар для текущего шарда
for i, (img_path, label) in enumerate(tqdm(chunk, desc=f"Шард {shard_idx:03d}", position=worker_pos, leave=False)):
try:
# --- МАГИЯ СЖАТИЯ ---
with Image.open(img_path) as img:
img = img.convert("RGB")
img = img.resize((256, 256), Image.Resampling.BILINEAR)
with io.BytesIO() as img_byte_arr:
img.save(img_byte_arr, format='JPEG', quality=85)
image_data = img_byte_arr.getvalue()
# --------------------
key = f"{shard_idx:06d}_{i:05d}"
sink.write({
"__key__": key,
"jpg": image_data,
"cls": label
})
success_count += 1
except Exception:
error_count += 1
continue # Игнорируем битые файлы
return shard_idx
# 3. Запускаем армию воркеров
with ProcessPoolExecutor(max_workers=NUM_WORKERS) as executor:
tasks = [(i, chunk) for i, chunk in enumerate(chunks)]
# Отправляем задачи в пул
futures = [executor.submit(build_shard, task) for task in tasks]
# ГЛАВНЫЙ прогресс-бар (position=0, всегда висит на самой первой строчке)
for future in tqdm(as_completed(futures), total=len(tasks), desc="📊 ОБЩИЙ ПРОГРЕСС", position=0, leave=True):
try:
future.result()
except Exception:
pass
# Печатаем пару пустых строк, чтобы финальный текст не налез на бары
print("\n" * (NUM_WORKERS + 2))
print("🎉 ПАРАЛЛЕЛЬНАЯ УПАКОВКА И СЖАТИЕ ПОЛНОСТЬЮ ЗАВЕРШЕНЫ!")
+919
View File
@@ -0,0 +1,919 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 4,
"id": "e6aa65e8",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import random\n",
"from pathlib import Path\n",
"from tqdm.notebook import tqdm\n",
"import webdataset as wds\n",
"\n",
"# --- НАСТРОЙКИ ---\n",
"# Оригинальная папка с датасетом (на NFS)\n",
"DATA_ROOT = Path(\"/home/zin/projects/Thesis/NFS/Thesis/Emoset/Original-2.41M\")\n",
"\n",
"# Новая папка, куда мы сложим готовые .tar архивы (шарды)\n",
"# Лучше создать её рядом с оригинальным датасетом на NFS\n",
"SHARDS_DIR = Path(\"/home/zin/projects/Thesis/NFS/Thesis/Emoset/shards-2.41M\")\n",
"SHARDS_DIR.mkdir(parents=True, exist_ok=True)\n",
"\n",
"# Маппинг классов\n",
"EMO_MAP = {\n",
" \"amusement\": 0, \"anger\": 1, \"awe\": 2, \"contentment\": 3,\n",
" \"disgust\": 4, \"excitement\": 5, \"fear\": 6, \"sad\": 7, \"sadness\": 7\n",
"}\n",
"\n",
"# Размер одного архива. 10 000 картинок — идеальный баланс\n",
"MAX_SAMPLES_PER_SHARD = 10000"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "09d0e56c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"🔍 Сканирование директории /home/zin/projects/Thesis/NFS/Thesis/Emoset/Original-2.41M...\n"
]
},
{
"ename": "KeyboardInterrupt",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mKeyboardInterrupt\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[5]\u001b[39m\u001b[32m, line 11\u001b[39m\n\u001b[32m 8\u001b[39m full_path = os.path.join(root, file)\n\u001b[32m 9\u001b[39m \u001b[38;5;66;03m# Извлекаем эмоцию (зависит от структуры папок, берем предпоследнюю папку)\u001b[39;00m\n\u001b[32m 10\u001b[39m \u001b[38;5;66;03m# Путь: .../amusement/0/image.jpg -> root_parts[-2] будет 'amusement'\u001b[39;00m\n\u001b[32m---> \u001b[39m\u001b[32m11\u001b[39m path_parts = \u001b[43mPath\u001b[49m\u001b[43m(\u001b[49m\u001b[43mfull_path\u001b[49m\u001b[43m)\u001b[49m.parts\n\u001b[32m 12\u001b[39m emotion_folder = path_parts[-\u001b[32m3\u001b[39m].lower()\n\u001b[32m 14\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m emotion_folder \u001b[38;5;129;01min\u001b[39;00m EMO_MAP:\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/pathlib.py:871\u001b[39m, in \u001b[36mPath.__new__\u001b[39m\u001b[34m(cls, *args, **kwargs)\u001b[39m\n\u001b[32m 869\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mcls\u001b[39m \u001b[38;5;129;01mis\u001b[39;00m Path:\n\u001b[32m 870\u001b[39m \u001b[38;5;28mcls\u001b[39m = WindowsPath \u001b[38;5;28;01mif\u001b[39;00m os.name == \u001b[33m'\u001b[39m\u001b[33mnt\u001b[39m\u001b[33m'\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m PosixPath\n\u001b[32m--> \u001b[39m\u001b[32m871\u001b[39m \u001b[38;5;28mself\u001b[39m = \u001b[38;5;28;43mcls\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_from_parts\u001b[49m\u001b[43m(\u001b[49m\u001b[43margs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 872\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28mself\u001b[39m._flavour.is_supported:\n\u001b[32m 873\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mNotImplementedError\u001b[39;00m(\u001b[33m\"\u001b[39m\u001b[33mcannot instantiate \u001b[39m\u001b[38;5;132;01m%r\u001b[39;00m\u001b[33m on your system\u001b[39m\u001b[33m\"\u001b[39m\n\u001b[32m 874\u001b[39m % (\u001b[38;5;28mcls\u001b[39m.\u001b[34m__name__\u001b[39m,))\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/pathlib.py:509\u001b[39m, in \u001b[36mPurePath._from_parts\u001b[39m\u001b[34m(cls, args)\u001b[39m\n\u001b[32m 504\u001b[39m \u001b[38;5;129m@classmethod\u001b[39m\n\u001b[32m 505\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34m_from_parts\u001b[39m(\u001b[38;5;28mcls\u001b[39m, args):\n\u001b[32m 506\u001b[39m \u001b[38;5;66;03m# We need to call _parse_args on the instance, so as to get the\u001b[39;00m\n\u001b[32m 507\u001b[39m \u001b[38;5;66;03m# right flavour.\u001b[39;00m\n\u001b[32m 508\u001b[39m \u001b[38;5;28mself\u001b[39m = \u001b[38;5;28mobject\u001b[39m.\u001b[34m__new__\u001b[39m(\u001b[38;5;28mcls\u001b[39m)\n\u001b[32m--> \u001b[39m\u001b[32m509\u001b[39m drv, root, parts = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_parse_args\u001b[49m\u001b[43m(\u001b[49m\u001b[43margs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 510\u001b[39m \u001b[38;5;28mself\u001b[39m._drv = drv\n\u001b[32m 511\u001b[39m \u001b[38;5;28mself\u001b[39m._root = root\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/pathlib.py:502\u001b[39m, in \u001b[36mPurePath._parse_args\u001b[39m\u001b[34m(cls, args)\u001b[39m\n\u001b[32m 497\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[32m 498\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mTypeError\u001b[39;00m(\n\u001b[32m 499\u001b[39m \u001b[33m\"\u001b[39m\u001b[33margument should be a str object or an os.PathLike \u001b[39m\u001b[33m\"\u001b[39m\n\u001b[32m 500\u001b[39m \u001b[33m\"\u001b[39m\u001b[33mobject returning str, not \u001b[39m\u001b[38;5;132;01m%r\u001b[39;00m\u001b[33m\"\u001b[39m\n\u001b[32m 501\u001b[39m % \u001b[38;5;28mtype\u001b[39m(a))\n\u001b[32m--> \u001b[39m\u001b[32m502\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mcls\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_flavour\u001b[49m\u001b[43m.\u001b[49m\u001b[43mparse_parts\u001b[49m\u001b[43m(\u001b[49m\u001b[43mparts\u001b[49m\u001b[43m)\u001b[49m\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/pathlib.py:67\u001b[39m, in \u001b[36m_Flavour.parse_parts\u001b[39m\u001b[34m(self, parts)\u001b[39m\n\u001b[32m 65\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m altsep:\n\u001b[32m 66\u001b[39m part = part.replace(altsep, sep)\n\u001b[32m---> \u001b[39m\u001b[32m67\u001b[39m drv, root, rel = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43msplitroot\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpart\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 68\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m sep \u001b[38;5;129;01min\u001b[39;00m rel:\n\u001b[32m 69\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m x \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mreversed\u001b[39m(rel.split(sep)):\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/pathlib.py:241\u001b[39m, in \u001b[36m_PosixFlavour.splitroot\u001b[39m\u001b[34m(self, part, sep)\u001b[39m\n\u001b[32m 239\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34msplitroot\u001b[39m(\u001b[38;5;28mself\u001b[39m, part, sep=sep):\n\u001b[32m 240\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m part \u001b[38;5;129;01mand\u001b[39;00m part[\u001b[32m0\u001b[39m] == sep:\n\u001b[32m--> \u001b[39m\u001b[32m241\u001b[39m stripped_part = part.lstrip(sep)\n\u001b[32m 242\u001b[39m \u001b[38;5;66;03m# According to POSIX path resolution:\u001b[39;00m\n\u001b[32m 243\u001b[39m \u001b[38;5;66;03m# http://pubs.opengroup.org/onlinepubs/009695399/basedefs/xbd_chap04.html#tag_04_11\u001b[39;00m\n\u001b[32m 244\u001b[39m \u001b[38;5;66;03m# \"A pathname that begins with two successive slashes may be\u001b[39;00m\n\u001b[32m 245\u001b[39m \u001b[38;5;66;03m# interpreted in an implementation-defined manner, although more\u001b[39;00m\n\u001b[32m 246\u001b[39m \u001b[38;5;66;03m# than two leading slashes shall be treated as a single slash\".\u001b[39;00m\n\u001b[32m 247\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(part) - \u001b[38;5;28mlen\u001b[39m(stripped_part) == \u001b[32m2\u001b[39m:\n",
"\u001b[31mKeyboardInterrupt\u001b[39m: "
]
}
],
"source": [
"samples = []\n",
"\n",
"print(f\"🔍 Сканирование директории {DATA_ROOT}...\")\n",
"# Используем os.walk, он часто работает быстрее rglob на сетевых дисках\n",
"for root, dirs, files in os.walk(DATA_ROOT):\n",
" for file in files:\n",
" if file.lower().endswith('.jpg'):\n",
" full_path = os.path.join(root, file)\n",
" # Извлекаем эмоцию (зависит от структуры папок, берем предпоследнюю папку)\n",
" # Путь: .../amusement/0/image.jpg -> root_parts[-2] будет 'amusement'\n",
" path_parts = Path(full_path).parts\n",
" emotion_folder = path_parts[-3].lower()\n",
" \n",
" if emotion_folder in EMO_MAP:\n",
" samples.append((full_path, EMO_MAP[emotion_folder]))\n",
"\n",
"print(f\"✅ Найдено изображений: {len(samples)}\")\n",
"\n",
"# САМЫЙ ВАЖНЫЙ ШАГ: Глобальное перемешивание перед упаковкой\n",
"print(\"🔀 Перемешиваем датасет...\")\n",
"random.shuffle(samples)\n",
"print(\"✅ Перемешивание завершено!\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0fe71d72",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"📦 Подготовлено 205 задач (шардов).\n",
"💾 Целевая папка: ../../dataset/EmoSet-2.41M-shards\n",
"🚀 Запуск упаковки в 42 потоков...\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Exception in thread Thread-4 (ui_thread_func):\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/threading.py\", line 1045, in _bootstrap_inner\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/threading.py\", line 982, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/tmp/ipykernel_16083/731719608.py\", line 72, in ui_thread_func\n",
" File \"/home/zin/projects/Thesis/.venv/lib/python3.11/site-packages/tqdm/notebook.py\", line 223, in __init__\n",
" super().__init__(*args, **kwargs)\n",
" File \"/home/zin/projects/Thesis/.venv/lib/python3.11/site-packages/tqdm/std.py\", line 1001, in __init__\n",
" raise (\n",
"tqdm.std.TqdmKeyError: \"Unknown argument(s): {'color': 'blue'}\"\n",
"Process ForkProcess-39:\n",
"Process ForkProcess-35:\n",
"Process ForkProcess-28:\n",
"Process ForkProcess-29:\n",
"Process ForkProcess-43:\n",
"Process ForkProcess-38:\n",
"Process ForkProcess-36:\n",
"Process ForkProcess-37:\n",
"Process ForkProcess-34:\n",
"Traceback (most recent call last):\n",
"Traceback (most recent call last):\n",
"Traceback (most recent call last):\n",
"Traceback (most recent call last):\n",
"Traceback (most recent call last):\n",
"Traceback (most recent call last):\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-27:\n",
"Process ForkProcess-25:\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-30:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Process ForkProcess-41:\n",
"Process ForkProcess-31:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Process ForkProcess-22:\n",
"Process ForkProcess-32:\n",
"Process ForkProcess-40:\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-42:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Process ForkProcess-20:\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-24:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Process ForkProcess-23:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Process ForkProcess-26:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Process ForkProcess-21:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-16:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-13:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Process ForkProcess-9:\n",
"Process ForkProcess-17:\n",
"Traceback (most recent call last):\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Process ForkProcess-6:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"Process ForkProcess-8:\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Process ForkProcess-3:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Traceback (most recent call last):\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-10:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"Process ForkProcess-7:\n",
"Process ForkProcess-11:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-14:\n",
"Process ForkProcess-4:\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-5:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/threading.py\", line 982, in run\n",
"Process ForkProcess-2:\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-12:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-15:\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Traceback (most recent call last):\n",
"Process ForkProcess-18:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n"
]
},
{
"ename": "KeyboardInterrupt",
"evalue": "",
"output_type": "error",
"traceback": [
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
"\u001b[31mKeyboardInterrupt\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[3]\u001b[39m\u001b[32m, line 102\u001b[39m\n\u001b[32m 101\u001b[39m \u001b[38;5;28;01mwith\u001b[39;00m ProcessPoolExecutor(max_workers=NUM_WORKERS) \u001b[38;5;28;01mas\u001b[39;00m executor:\n\u001b[32m--> \u001b[39m\u001b[32m102\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43m_\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mexecutor\u001b[49m\u001b[43m.\u001b[49m\u001b[43mmap\u001b[49m\u001b[43m(\u001b[49m\u001b[43mbuild_shard\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtasks\u001b[49m\u001b[43m)\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 103\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mpass\u001b[39;49;00m \u001b[38;5;66;03m# Просто ждем завершения всех задач\u001b[39;00m\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py:620\u001b[39m, in \u001b[36m_chain_from_iterable_of_lists\u001b[39m\u001b[34m(iterable)\u001b[39m\n\u001b[32m 615\u001b[39m \u001b[38;5;250m\u001b[39m\u001b[33;03m\"\"\"\u001b[39;00m\n\u001b[32m 616\u001b[39m \u001b[33;03mSpecialized implementation of itertools.chain.from_iterable.\u001b[39;00m\n\u001b[32m 617\u001b[39m \u001b[33;03mEach item in *iterable* should be a list. This function is\u001b[39;00m\n\u001b[32m 618\u001b[39m \u001b[33;03mcareful not to keep references to yielded objects.\u001b[39;00m\n\u001b[32m 619\u001b[39m \u001b[33;03m\"\"\"\u001b[39;00m\n\u001b[32m--> \u001b[39m\u001b[32m620\u001b[39m \u001b[43m\u001b[49m\u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43melement\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43miterable\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 621\u001b[39m \u001b[43m \u001b[49m\u001b[43melement\u001b[49m\u001b[43m.\u001b[49m\u001b[43mreverse\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:619\u001b[39m, in \u001b[36mExecutor.map.<locals>.result_iterator\u001b[39m\u001b[34m()\u001b[39m\n\u001b[32m 618\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m timeout \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m619\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m \u001b[43m_result_or_cancel\u001b[49m\u001b[43m(\u001b[49m\u001b[43mfs\u001b[49m\u001b[43m.\u001b[49m\u001b[43mpop\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 620\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:317\u001b[39m, in \u001b[36m_result_or_cancel\u001b[39m\u001b[34m(***failed resolving arguments***)\u001b[39m\n\u001b[32m 316\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m317\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mfut\u001b[49m\u001b[43m.\u001b[49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 318\u001b[39m \u001b[38;5;28;01mfinally\u001b[39;00m:\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:451\u001b[39m, in \u001b[36mFuture.result\u001b[39m\u001b[34m(self, timeout)\u001b[39m\n\u001b[32m 449\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m.__get_result()\n\u001b[32m--> \u001b[39m\u001b[32m451\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_condition\u001b[49m\u001b[43m.\u001b[49m\u001b[43mwait\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 453\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m._state \u001b[38;5;129;01min\u001b[39;00m [CANCELLED, CANCELLED_AND_NOTIFIED]:\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/threading.py:327\u001b[39m, in \u001b[36mCondition.wait\u001b[39m\u001b[34m(self, timeout)\u001b[39m\n\u001b[32m 326\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m timeout \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m327\u001b[39m \u001b[43mwaiter\u001b[49m\u001b[43m.\u001b[49m\u001b[43macquire\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 328\u001b[39m gotit = \u001b[38;5;28;01mTrue\u001b[39;00m\n",
"\u001b[31mKeyboardInterrupt\u001b[39m: ",
"\nDuring handling of the above exception, another exception occurred:\n",
"\u001b[31mKeyboardInterrupt\u001b[39m Traceback (most recent call last)",
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[3]\u001b[39m\u001b[32m, line 101\u001b[39m\n\u001b[32m 99\u001b[39m \u001b[38;5;66;03m# 3. Запускаем 42 боевых ядра\u001b[39;00m\n\u001b[32m 100\u001b[39m tasks = [(i, chunk, queue) \u001b[38;5;28;01mfor\u001b[39;00m i, chunk \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28menumerate\u001b[39m(chunks)]\n\u001b[32m--> \u001b[39m\u001b[32m101\u001b[39m \u001b[43m\u001b[49m\u001b[38;5;28;43;01mwith\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mProcessPoolExecutor\u001b[49m\u001b[43m(\u001b[49m\u001b[43mmax_workers\u001b[49m\u001b[43m=\u001b[49m\u001b[43mNUM_WORKERS\u001b[49m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mas\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mexecutor\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 102\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43m_\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mexecutor\u001b[49m\u001b[43m.\u001b[49m\u001b[43mmap\u001b[49m\u001b[43m(\u001b[49m\u001b[43mbuild_shard\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtasks\u001b[49m\u001b[43m)\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 103\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mpass\u001b[39;49;00m \u001b[38;5;66;03m# Просто ждем завершения всех задач\u001b[39;00m\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:647\u001b[39m, in \u001b[36mExecutor.__exit__\u001b[39m\u001b[34m(self, exc_type, exc_val, exc_tb)\u001b[39m\n\u001b[32m 646\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34m__exit__\u001b[39m(\u001b[38;5;28mself\u001b[39m, exc_type, exc_val, exc_tb):\n\u001b[32m--> \u001b[39m\u001b[32m647\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mshutdown\u001b[49m\u001b[43m(\u001b[49m\u001b[43mwait\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43;01mTrue\u001b[39;49;00m\u001b[43m)\u001b[49m\n\u001b[32m 648\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;01mFalse\u001b[39;00m\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py:851\u001b[39m, in \u001b[36mProcessPoolExecutor.shutdown\u001b[39m\u001b[34m(self, wait, cancel_futures)\u001b[39m\n\u001b[32m 848\u001b[39m \u001b[38;5;28mself\u001b[39m._executor_manager_thread_wakeup.wakeup()\n\u001b[32m 850\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m._executor_manager_thread \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;129;01mand\u001b[39;00m wait:\n\u001b[32m--> \u001b[39m\u001b[32m851\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_executor_manager_thread\u001b[49m\u001b[43m.\u001b[49m\u001b[43mjoin\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 852\u001b[39m \u001b[38;5;66;03m# To reduce the risk of opening too many files, remove references to\u001b[39;00m\n\u001b[32m 853\u001b[39m \u001b[38;5;66;03m# objects that use file descriptors.\u001b[39;00m\n\u001b[32m 854\u001b[39m \u001b[38;5;28mself\u001b[39m._executor_manager_thread = \u001b[38;5;28;01mNone\u001b[39;00m\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/threading.py:1119\u001b[39m, in \u001b[36mThread.join\u001b[39m\u001b[34m(self, timeout)\u001b[39m\n\u001b[32m 1116\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(\u001b[33m\"\u001b[39m\u001b[33mcannot join current thread\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 1118\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m timeout \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m1119\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_wait_for_tstate_lock\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 1120\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[32m 1121\u001b[39m \u001b[38;5;66;03m# the behavior of a negative timeout isn't documented, but\u001b[39;00m\n\u001b[32m 1122\u001b[39m \u001b[38;5;66;03m# historically .join(timeout=x) for x<0 has acted as if timeout=0\u001b[39;00m\n\u001b[32m 1123\u001b[39m \u001b[38;5;28mself\u001b[39m._wait_for_tstate_lock(timeout=\u001b[38;5;28mmax\u001b[39m(timeout, \u001b[32m0\u001b[39m))\n",
"\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/threading.py:1139\u001b[39m, in \u001b[36mThread._wait_for_tstate_lock\u001b[39m\u001b[34m(self, block, timeout)\u001b[39m\n\u001b[32m 1136\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m\n\u001b[32m 1138\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m1139\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[43mlock\u001b[49m\u001b[43m.\u001b[49m\u001b[43macquire\u001b[49m\u001b[43m(\u001b[49m\u001b[43mblock\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m:\n\u001b[32m 1140\u001b[39m lock.release()\n\u001b[32m 1141\u001b[39m \u001b[38;5;28mself\u001b[39m._stop()\n",
"\u001b[31mKeyboardInterrupt\u001b[39m: "
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Traceback (most recent call last):\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"Process ForkProcess-19:\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"Traceback (most recent call last):\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n",
" self.run()\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n",
" self._target(*self._args, **self._kwargs)\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 103, in get\n",
" res = self._recv_bytes()\n",
" ^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n",
" call_item = call_queue.get(block=True)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/connection.py\", line 216, in recv_bytes\n",
" buf = self._recv_bytes(maxlength)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n",
" with self._rlock:\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/connection.py\", line 430, in _recv_bytes\n",
" buf = self._recv(4)\n",
" ^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n",
" return self._semlock.__enter__()\n",
" ^^^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n",
" File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/connection.py\", line 395, in _recv\n",
" chunk = read(handle, remaining)\n",
" ^^^^^^^^^^^^^^^^^^^^^^^\n",
"KeyboardInterrupt\n",
"KeyboardInterrupt\n"
]
}
],
"source": [
"import multiprocessing as mp\n",
"from concurrent.futures import ProcessPoolExecutor\n",
"import webdataset as wds\n",
"from PIL import Image\n",
"import io\n",
"import threading\n",
"from pathlib import Path\n",
"\n",
"# Включаем красивую версию tqdm специально для Jupyter\n",
"from tqdm.notebook import tqdm\n",
"\n",
"# --- ПУТИ И НАСТРОЙКИ ---\n",
"SHARDS_DIR = Path(\"../../dataset/EmoSet-2.41M-shards\")\n",
"SHARDS_DIR.mkdir(parents=True, exist_ok=True)\n",
"\n",
"NUM_WORKERS = 42\n",
"MAX_SAMPLES_PER_SHARD = 10000\n",
"\n",
"# Дробим список на чанки\n",
"chunks = [samples[i:i + MAX_SAMPLES_PER_SHARD] for i in range(0, len(samples), MAX_SAMPLES_PER_SHARD)]\n",
"TOTAL_FILES = len(samples)\n",
"TOTAL_SHARDS = len(chunks)\n",
"\n",
"print(f\"📦 Подготовлено {TOTAL_SHARDS} задач (шардов).\")\n",
"print(f\"💾 Целевая папка: {SHARDS_DIR}\")\n",
"print(f\"🚀 Запуск упаковки в {NUM_WORKERS} потоков...\\n\")\n",
"\n",
"# --- ФУНКЦИЯ ДЛЯ ЯДЕР ПРОЦЕССОРА ---\n",
"def build_shard(args):\n",
" shard_idx, chunk, queue = args\n",
" shard_path = SHARDS_DIR / f\"emoset-{shard_idx:06d}.tar\"\n",
" \n",
" with wds.TarWriter(str(shard_path)) as sink:\n",
" for i, (img_path, label) in enumerate(chunk):\n",
" try:\n",
" # Магия сжатия\n",
" with Image.open(img_path) as img:\n",
" img = img.convert(\"RGB\")\n",
" img = img.resize((256, 256), Image.Resampling.BILINEAR)\n",
" with io.BytesIO() as img_byte_arr:\n",
" img.save(img_byte_arr, format='JPEG', quality=85)\n",
" image_data = img_byte_arr.getvalue()\n",
" \n",
" key = f\"{shard_idx:06d}_{i:05d}\"\n",
" sink.write({\n",
" \"__key__\": key,\n",
" \"jpg\": image_data,\n",
" \"cls\": label\n",
" })\n",
" \n",
" # Чтобы не перегружать очередь, отправляем отчет каждые 50 файлов\n",
" if (i + 1) % 50 == 0:\n",
" queue.put((\"file\", 50))\n",
" \n",
" except Exception:\n",
" # Если файл битый, всё равно считаем его \"пройденным\", чтобы бар не застрял\n",
" queue.put((\"file\", 1))\n",
" continue\n",
" \n",
" # Сообщаем об остатке файлов в чанке, которые не попали в % 50\n",
" remainder = len(chunk) % 50\n",
" if remainder != 0:\n",
" queue.put((\"file\", remainder))\n",
" \n",
" # Сообщаем, что целый шард готов\n",
" queue.put((\"shard\", 1))\n",
" return shard_idx\n",
"\n",
"# --- ФУНКЦИЯ ОТРИСОВКИ ИНТЕРФЕЙСА (Фоновый поток) ---\n",
"def ui_thread_func(q, total_files, total_shards):\n",
" # Создаем две красивые независимые полоски\n",
" pbar_files = tqdm(total=total_files, desc=\"🖼️ Сжато файлов\", color=\"blue\")\n",
" pbar_shards = tqdm(total=total_shards, desc=\"📦 Готово архивов\", color=\"green\")\n",
" \n",
" while True:\n",
" msg = q.get()\n",
" if msg == \"DONE\":\n",
" break\n",
" \n",
" msg_type, count = msg\n",
" if msg_type == \"file\":\n",
" pbar_files.update(count)\n",
" elif msg_type == \"shard\":\n",
" pbar_shards.update(count)\n",
" \n",
" pbar_files.close()\n",
" pbar_shards.close()\n",
"\n",
"# === ГЛАВНЫЙ ЗАПУСК ===\n",
"if __name__ == '__main__':\n",
" # 1. Создаем диспетчер очередей\n",
" manager = mp.Manager()\n",
" queue = manager.Queue()\n",
" \n",
" # 2. Запускаем фоновый поток отрисовки\n",
" ui_thread = threading.Thread(target=ui_thread_func, args=(queue, TOTAL_FILES, TOTAL_SHARDS))\n",
" ui_thread.start()\n",
" \n",
" # 3. Запускаем 42 боевых ядра\n",
" tasks = [(i, chunk, queue) for i, chunk in enumerate(chunks)]\n",
" with ProcessPoolExecutor(max_workers=NUM_WORKERS) as executor:\n",
" for _ in executor.map(build_shard, tasks):\n",
" pass # Просто ждем завершения всех задач\n",
" \n",
" # 4. Убиваем поток отрисовки и завершаем работу\n",
" queue.put(\"DONE\")\n",
" ui_thread.join()\n",
" \n",
" print(\"\\n🎉 ПАРАЛЛЕЛЬНАЯ УПАКОВКА И СЖАТИЕ ПОЛНОСТЬЮ ЗАВЕРШЕНЫ!\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python (my-python-project)",
"language": "python",
"name": "my-python-project"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}