diff --git a/src/dataset_paths_cache.pkl b/src/dataset_paths_cache.pkl new file mode 100644 index 0000000..010b852 Binary files /dev/null and b/src/dataset_paths_cache.pkl differ diff --git a/src/main.py b/src/main.py index ddc3485..b0da338 100644 --- a/src/main.py +++ b/src/main.py @@ -1,16 +1,15 @@ -import streamlit as st -import streamlit.components.v1 as components import sys import os import subprocess +import streamlit as st +import streamlit.components.v1 as components + from data_loader import load_music_engine, load_emoset_data, load_image_processor from tabs.tab_dataset import render_dataset_tab from tabs.tab_live import render_live_tab -# ---------------------------- -# Запуск приложения -# ---------------------------- +# Костыль для прямого запуска if __name__ == "__main__": if "STREAMLIT_RUN" not in os.environ: os.environ["STREAMLIT_RUN"] = "1" @@ -18,33 +17,26 @@ if __name__ == "__main__": subprocess.run(cmd) sys.exit() -# Автоматическое определение типа устройства через URL query parameters -# Считывание происходит до set_page_config, что позволяет динамически менять layout -viewport = st.query_params.get("viewport", "desktop") -layout_mode = "centered" if viewport == "mobile" else "wide" +viewport_mode = st.query_params.get("viewport", "desktop") +page_layout = "centered" if viewport_mode == "mobile" else "wide" -st.set_page_config(page_title="Thesis Demo", layout=layout_mode) +st.set_page_config(page_title="Thesis Demo", layout=page_layout) -# Внедрение легковесного JavaScript-детектора для определения ширины экрана -# Перезагружает контекст Streamlit один раз при инициализации сессии, исключая циклическую перезагрузку +# Определения ширины экрана и смены верстки components.html( """ """, @@ -52,43 +44,30 @@ components.html( width=0, ) -# Глобальная инъекция базовых CSS-стилей для адаптации медиаконтента st.markdown( """ """, unsafe_allow_html=True ) -# ---------------------------- -# Инициализация движка и данных -# ---------------------------- -matcher = load_music_engine() -image_processor = load_image_processor() -image_files, embeddings, labels_array, images_path = load_emoset_data() +# Подгрузка ML-моделей и датасета +music_matcher = load_music_engine() +img_processor = load_image_processor() +emoset_files, emoset_embeddings, emoset_labels, emoset_path = load_emoset_data() -# ---------------------------- -# Интерфейс и Вкладки -# ---------------------------- st.title("Генератор саундтреков (Research Demo)") -# Изменен порядок: Анализ событий стал первой активной вкладкой -tab1, tab2 = st.tabs(["Анализ событий (Свои фото)", "Отладка (Датасет EmoSet)"]) +tab_live, tab_debug = st.tabs(["Анализ событий (Свои фото)", "Отладка (Датасет EmoSet)"]) -with tab1: - if image_processor: - render_live_tab(matcher, image_processor) +with tab_live: + if img_processor: + render_live_tab(music_matcher, img_processor) else: - st.error("Система обработки изображений недоступна (не найдены веса ResNet).") + st.error("Ошибка загрузки: не найдены веса ResNet для image_processor.") -with tab2: - render_dataset_tab(matcher, image_files, embeddings, labels_array, images_path) \ No newline at end of file +with tab_debug: + render_dataset_tab(music_matcher, emoset_files, emoset_embeddings, emoset_labels, emoset_path) \ No newline at end of file diff --git a/src/scripts/finetune.py b/src/scripts/finetune.py new file mode 100644 index 0000000..8c33e20 --- /dev/null +++ b/src/scripts/finetune.py @@ -0,0 +1,314 @@ +import os +import gc +import pickle +import random +from pathlib import Path + +import torch +import torch.nn as nn +from torch.utils.data import Dataset, DataLoader +import torchvision.transforms as T +import torchvision.io as tv_io +from torch.amp import autocast, GradScaler +from tqdm import tqdm +import timm + +# ========================================== +# 1. КОНФИГУРАЦИЯ И ПУТИ +# ========================================== +DEVICE = "cuda" if torch.cuda.is_available() else "cpu" +print(f"Используем устройство: {DEVICE}") + +# Путь к огромному датасету на NFS +DATA_ROOT = Path("/home/zin/projects/Thesis/dataset/Original-2.41M") +CACHE_PATH = Path("/home/zin/projects/Thesis/src/dataset_paths_cache.pkl") + +# Пути к моделям +PREVIOUS_WEIGHTS = Path("/home/zin/projects/Thesis/src/emoset_resnet50_best.pth") # Старые веса (118K) +RESUME_CHECKPOINT = Path("/home/zin/projects/Thesis/src/emoset_resnet50_resume.pth") # Файл для восстановления сессии +SAVE_MODEL_PATH = Path("/home/zin/projects/Thesis/src/emoset_resnet50_finetuned_2.41M.pth") # Финальный файл + +EMO_MAP = { + "amusement": 0, "anger": 1, "awe": 2, "contentment": 3, + "disgust": 4, "excitement": 5, "fear": 6, "sad": 7, "sadness": 7 +} + +# --- НАСТРОЙКИ ОБУЧЕНИЯ --- +BATCH_SIZE = 82 +EPOCHS = 15 +LR = 5e-5 # Низкий LR, так как мы делаем Fine-Tuning +NUM_TRAIN_WORKERS = 48 +NUM_VAL_WORKERS = 18 + +# Настройки защиты от переобучения +PATIENCE = 4 +best_val_loss = float('inf') +epochs_no_improve = 0 +start_epoch = 1 + +# ========================================== +# 2. ПОДГОТОВКА ДАННЫХ (БЫСТРЫЙ КЭШ) +# ========================================== +if CACHE_PATH.exists(): + print(f"Загрузка списка файлов из кэша: {CACHE_PATH}...") + with open(CACHE_PATH, 'rb') as f: + cache_data = pickle.load(f) + all_paths = cache_data['image_paths'] + all_labels = cache_data['labels'] + print(f"Готово! Моментально загружено {len(all_paths)} путей.") +else: + print(f"Сканирование NFS директории {DATA_ROOT} (Выполняется один раз)...") + all_paths, all_labels = [], [] + for img_path in DATA_ROOT.rglob('*.jpg'): + emotion_folder = img_path.parts[-3].lower() + if emotion_folder in EMO_MAP: + all_paths.append(str(img_path)) + all_labels.append(EMO_MAP[emotion_folder]) + + with open(CACHE_PATH, 'wb') as f: + pickle.dump({'image_paths': all_paths, 'labels': all_labels}, f) + print(f"Сохранено в кэш: {len(all_paths)} изображений.") + +# Разделение на Train / Validation (95% / 5%) +random.seed(42) # Фиксируем сид, чтобы при перезапусках сплит не менялся +combined = list(zip(all_paths, all_labels)) +random.shuffle(combined) +all_paths, all_labels = zip(*combined) + +split_idx = int(len(all_paths) * 0.95) +train_paths, train_labels = all_paths[:split_idx], all_labels[:split_idx] +val_paths, val_labels = all_paths[split_idx:], all_labels[split_idx:] +print(f"Трейн: {len(train_paths)} | Валидация: {len(val_paths)}") + +# ========================================== +# 3. DATASET & DATALOADER +# ========================================== +class EmoSetDirectDataset(Dataset): + def __init__(self, image_paths, labels): + self.image_paths = image_paths + self.labels = labels + # На процессоре делаем только базовый ресайз + self.base_transform = T.Resize((256, 256), antialias=True) + + def __len__(self): return len(self.image_paths) + + def __getitem__(self, idx): + try: + image = tv_io.read_image(self.image_paths[idx], mode=tv_io.ImageReadMode.RGB) + image = image.to(torch.float32) / 255.0 + image = self.base_transform(image) + except Exception: + # Отказоустойчивость для битых файлов из интернета + image = torch.zeros((3, 256, 256), dtype=torch.float32) + return image, self.labels[idx] + +# --- ИСПРАВЛЕННЫЕ ЗАГРУЗЧИКИ С PREFETCH --- +train_loader = DataLoader( + EmoSetDirectDataset(train_paths, train_labels), + batch_size=BATCH_SIZE, + shuffle=True, + num_workers=NUM_TRAIN_WORKERS, + pin_memory=True, + prefetch_factor=2, + persistent_workers=True +) + +val_loader = DataLoader( + EmoSetDirectDataset(val_paths, val_labels), + batch_size=BATCH_SIZE, + shuffle=False, + num_workers=NUM_VAL_WORKERS, + pin_memory=True, + prefetch_factor=2, + persistent_workers=True +) + +# ========================================== +# 4. АУГМЕНТАЦИИ НА GPU (СУПЕР СКОРОСТЬ) +# ========================================== +gpu_train_transforms = torch.nn.Sequential( + T.RandomCrop((224, 224)), + T.RandomHorizontalFlip(p=0.5), + T.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1), + T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) +).to(DEVICE) + +gpu_val_transforms = torch.nn.Sequential( + T.CenterCrop((224, 224)), + T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) +).to(DEVICE) + +# ========================================== +# 5. ИНИЦИАЛИЗАЦИЯ МОДЕЛИ +# ========================================== +print("\nСоздание архитектуры ResNet-50...") +model = timm.create_model('resnet50', pretrained=False, num_classes=8).to(DEVICE) + +criterion = nn.CrossEntropyLoss() +optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4) +scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS) +scaler = GradScaler() + +# --- ЛОГИКА БЕСШОВНОГО ВОССТАНОВЛЕНИЯ --- +if RESUME_CHECKPOINT.exists(): + print(f"ВОССТАНОВЛЕНИЕ СЕССИИ из: {RESUME_CHECKPOINT}") + checkpoint = torch.load(RESUME_CHECKPOINT, map_location=DEVICE) + model.load_state_dict(checkpoint['model_state_dict']) + optimizer.load_state_dict(checkpoint['optimizer_state_dict']) + scheduler.load_state_dict(checkpoint['scheduler_state_dict']) + if 'scaler_state_dict' in checkpoint: + scaler.load_state_dict(checkpoint['scaler_state_dict']) + if 'best_val_loss' in checkpoint: + best_val_loss = checkpoint['best_val_loss'] + start_epoch = checkpoint['epoch'] + 1 + print(f"УСПЕХ: Продолжаем обучение с эпохи {start_epoch}") +else: + print("Чекпоинт сессии не найден. Проверяем наличие базовых весов...") + if PREVIOUS_WEIGHTS.exists(): + print(f"📥 Загрузка базовых весов (от 118K): {PREVIOUS_WEIGHTS}") + model.load_state_dict(torch.load(PREVIOUS_WEIGHTS, map_location=DEVICE)) + else: + print("ВНИМАНИЕ: Базовые веса не найдены. Обучение начнется с нуля (ImageNet).") + model = timm.create_model('resnet50', pretrained=True, num_classes=8).to(DEVICE) + +# ========================================== +# 6. ГЛАВНЫЙ ЦИКЛ ОБУЧЕНИЯ +# ========================================== +if start_epoch > EPOCHS: + print(f"\nОбучение уже было завершено (цель: {EPOCHS} эпох).") +else: + print(f"\nСтарт обучения. Целевое количество эпох: {EPOCHS}") + + try: + for epoch in range(start_epoch, EPOCHS + 1): + + # --- ФАЗА 1: ТРЕНИРОВКА --- + model.train() + running_loss, correct, total = 0.0, 0, 0 + + pbar = tqdm(train_loader, desc=f"Эпоха {epoch}/{EPOCHS} [Тренировка]") + for inputs, labels in pbar: + try: + # Перенос на GPU и применение быстрых аугментаций + inputs = inputs.to(DEVICE, non_blocking=True) + labels = labels.to(DEVICE, non_blocking=True) + inputs = gpu_train_transforms(inputs) + + optimizer.zero_grad() + + # Смешанная точность (AMP) для экономии VRAM и ускорения + with autocast(device_type="cuda"): + outputs = model(inputs) + loss = criterion(outputs, labels) + + scaler.scale(loss).backward() + scaler.step(optimizer) + scaler.update() + + running_loss += loss.item() * inputs.size(0) + _, predicted = outputs.max(1) + total += labels.size(0) + correct += predicted.eq(labels).sum().item() + + pbar.set_postfix({'loss': f"{loss.item():.4f}", 'acc': f"{correct/total:.4f}"}) + + except RuntimeError as e: + # Хендлер нехватки памяти (OOM) + if "out of memory" in str(e).lower(): + print(f"\nВНИМАНИЕ: Нехватка VRAM! Очистка...") + if 'outputs' in locals(): del outputs + if 'loss' in locals(): del loss + torch.cuda.empty_cache() + optimizer.zero_grad() + continue + raise e + + train_loss = running_loss / total if total > 0 else 0 + train_acc = correct / total if total > 0 else 0 + + # Очистка мусора перед валидацией + gc.collect() + torch.cuda.empty_cache() + + # --- ФАЗА 2: ВАЛИДАЦИЯ --- + model.eval() + val_loss, val_correct, val_total = 0.0, 0, 0 + + with torch.no_grad(): + for val_inputs, val_labels in tqdm(val_loader, desc=f"Эпоха {epoch}/{EPOCHS} [Валидация]", leave=False): + val_inputs, val_labels = val_inputs.to(DEVICE), val_labels.to(DEVICE) + val_inputs = gpu_val_transforms(val_inputs) + + with autocast(device_type="cuda"): + val_outputs = model(val_inputs) + v_loss = criterion(val_outputs, val_labels) + + val_loss += v_loss.item() * val_inputs.size(0) + _, val_predicted = val_outputs.max(1) + val_total += val_labels.size(0) + val_correct += val_predicted.eq(val_labels).sum().item() + + epoch_val_loss = val_loss / val_total if val_total > 0 else 0 + epoch_val_acc = val_correct / val_total if val_total > 0 else 0 + + scheduler.step() + print(f"🏁 Эпоха {epoch} | Train Loss: {train_loss:.4f} (Acc: {train_acc:.4f}) | Val Loss: {epoch_val_loss:.4f} (Acc: {epoch_val_acc:.4f})") + + # --- ФАЗА 3: EARLY STOPPING И СОХРАНЕНИЕ --- + if epoch_val_loss < best_val_loss: + best_val_loss = epoch_val_loss + epochs_no_improve = 0 + torch.save(model.state_dict(), "../emoset_resnet50_best_2_41M.pth") + print("Новая лучшая модель найдена! Веса сохранены.") + else: + epochs_no_improve += 1 + print(f"Валидация не улучшается {epochs_no_improve}/{PATIENCE} эпох.") + if epochs_no_improve >= PATIENCE and epoch >= 15: # Даем модели минимум 15 эпох на раскачку + print("\nСРАБОТАЛА ЗАЩИТА ОТ ПЕРЕОБУЧЕНИЯ (Early Stopping)!") + break + + # Сохранение полного состояния сессии + checkpoint_state = { + 'epoch': epoch, + 'model_state_dict': model.state_dict(), + 'optimizer_state_dict': optimizer.state_dict(), + 'scheduler_state_dict': scheduler.state_dict(), + 'scaler_state_dict': scaler.state_dict(), + 'best_val_loss': best_val_loss + } + torch.save(checkpoint_state, RESUME_CHECKPOINT) + + # Сохранение весов конкретной эпохи как бэкап + torch.save(model.state_dict(), f"../emoset_resnet50_finetuned_ep{epoch}.pth") + + gc.collect() + torch.cuda.empty_cache() + + # ========================================== + # 7. ПЕРЕХВАТ РУЧНОЙ ОСТАНОВКИ (CTRL+C) + # ========================================== + except KeyboardInterrupt: + print("\n\nОБУЧЕНИЕ ПРЕРВАНО ВРУЧНУЮ (KeyboardInterrupt)!") + print(f"Экстренное сохранение состояния конвейера на эпохе {epoch}...") + + checkpoint_state = { + 'epoch': epoch, 'model_state_dict': model.state_dict(), + 'optimizer_state_dict': optimizer.state_dict(), + 'scheduler_state_dict': scheduler.state_dict(), 'scaler_state_dict': scaler.state_dict(), + 'best_val_loss': best_val_loss + } + torch.save(checkpoint_state, RESUME_CHECKPOINT) + + interrupted_weights_path = f"../emoset_resnet50_interrupted_ep{epoch}.pth" + torch.save(model.state_dict(), interrupted_weights_path) + print(f"Прогресс безопасно зафиксирован в файле {interrupted_weights_path}. Выходим.") + + # ========================================== + # 8. ФИНАЛЬНОЕ СОХРАНЕНИЕ + # ========================================== + else: + if SAVE_MODEL_PATH.parent.exists(): + torch.save(model.state_dict(), SAVE_MODEL_PATH) + print(f"\nОБУЧЕНИЕ УСПЕШНО ЗАВЕРШЕНО! Финальная модель: {SAVE_MODEL_PATH}") + if RESUME_CHECKPOINT.exists(): + RESUME_CHECKPOINT.unlink() # Удаляем resume файл за ненадобностью \ No newline at end of file diff --git a/src/scripts/finetune_embeddings.ipynb b/src/scripts/finetune_embeddings.ipynb new file mode 100644 index 0000000..179c048 --- /dev/null +++ b/src/scripts/finetune_embeddings.ipynb @@ -0,0 +1,467 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "71ef58af", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Используем устройство: cuda\n" + ] + } + ], + "source": [ + "import os\n", + "import torch\n", + "import torch.nn as nn\n", + "from torch.utils.data import Dataset, DataLoader\n", + "import torchvision.transforms as T\n", + "import pandas as pd\n", + "from pathlib import Path\n", + "from PIL import Image\n", + "from tqdm.notebook import tqdm\n", + "import timm\n", + "\n", + "# Проверяем GPU\n", + "DEVICE = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", + "print(f\"Используем устройство: {DEVICE}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "f4ae931c", + "metadata": {}, + "outputs": [], + "source": [ + "# === НАСТРОЙКИ ДООБУЧЕНИЯ ===\n", + "\n", + "# Абсолютный путь к смонтированному NFS\n", + "DATA_ROOT = Path(\"/home/zin/projects/Thesis/NFS/Thesis/Emoset/Original-2.41M\")\n", + "\n", + "# Пути относительно src/scripts/\n", + "PREVIOUS_WEIGHTS = Path(\"../emoset_resnet50_best.pth\")\n", + "SAVE_MODEL_PATH = Path(\"../emoset_resnet50_finetuned_2.41M.pth\")\n", + "\n", + "# Маппинг эмоций в те же индексы (0-7), которые использовались при первоначальном обучении\n", + "EMO_MAP = {\n", + " \"amusement\": 0,\n", + " \"anger\": 1,\n", + " \"awe\": 2,\n", + " \"contentment\": 3,\n", + " \"disgust\": 4,\n", + " \"excitement\": 5,\n", + " \"fear\": 6,\n", + " \"sad\": 7, # В твоем сообщении папка называется \"sad\"\n", + " \"sadness\": 7 # На всякий случай оставляем и классическое название\n", + "}\n", + "\n", + "BATCH_SIZE = 96\n", + "EPOCHS = 15\n", + "LR = 5e-5\n", + "NUM_WORKERS = 42" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "934cfe2c", + "metadata": {}, + "outputs": [], + "source": [ + "import pickle\n", + "import os\n", + "import torchvision.io as tv_io\n", + "\n", + "# Твои трансформации (только Resize, как мы сделали ранее)\n", + "train_transforms = T.Compose([\n", + " T.Resize((256, 256), antialias=True)\n", + "])\n", + "\n", + "class EmoSetNestedDataset(Dataset):\n", + " def __init__(self, root_dir, transform=None, cache_file=\"../dataset_cache_2.41M.pkl\"):\n", + " self.root_dir = Path(root_dir)\n", + " self.transform = transform\n", + " self.image_paths = []\n", + " self.labels = []\n", + " \n", + " # === ЛОГИКА КЭШИРОВАНИЯ ===\n", + " if os.path.exists(cache_file):\n", + " print(f\"📦 Загрузка списка файлов из локального кэша: {cache_file}...\")\n", + " with open(cache_file, 'rb') as f:\n", + " cache_data = pickle.load(f)\n", + " self.image_paths = cache_data['image_paths']\n", + " self.labels = cache_data['labels']\n", + " print(f\"⚡ Готово! Моментально загружено {len(self.image_paths)} путей.\")\n", + " else:\n", + " print(f\"🔍 Сканирование NFS директории {self.root_dir}...\")\n", + " print(\"Это займет около 8-10 минут. Выполняется один раз...\")\n", + " \n", + " for img_path in self.root_dir.rglob('*.jpg'):\n", + " emotion_folder = img_path.parts[-3].lower()\n", + " if emotion_folder in EMO_MAP:\n", + " self.image_paths.append(str(img_path))\n", + " self.labels.append(EMO_MAP[emotion_folder])\n", + " \n", + " print(f\"💾 Сохранение результатов сканирования в кэш: {cache_file}...\")\n", + " with open(cache_file, 'wb') as f:\n", + " pickle.dump({'image_paths': self.image_paths, 'labels': self.labels}, f)\n", + " \n", + " print(f\"✅ Успешно найдено и закэшировано {len(self.image_paths)} изображений.\")\n", + "\n", + " def __len__(self):\n", + " return len(self.image_paths)\n", + "\n", + " def __getitem__(self, idx):\n", + " img_path = self.image_paths[idx]\n", + " label = self.labels[idx]\n", + " \n", + " try:\n", + " image = tv_io.read_image(str(img_path), mode=tv_io.ImageReadMode.RGB)\n", + " image = image.to(torch.float32) / 255.0\n", + " except Exception as e:\n", + " image = torch.zeros((3, 256, 256), dtype=torch.float32)\n", + " \n", + " if self.transform:\n", + " image = self.transform(image)\n", + " \n", + " return image, label" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "b10adc06", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "📦 Загрузка списка файлов из локального кэша: ../dataset_paths_cache.pkl...\n", + "⚡ Готово! Моментально загружено 2048377 путей.\n", + "Батчей за одну эпоху: 21338\n", + "\n", + "Создание архитектуры ResNet50...\n", + "📝 Чекпоинт прерванной сессии не найден. Проверяем базовые веса...\n", + "УСПЕХ: Найдены предыдущие веса '../emoset_resnet50_best.pth' (из EmoSet-118K). Загружаем...\n" + ] + } + ], + "source": [ + "# Путь к кэш-файлу (лучше положить его в src, рядом со скриптами, чтобы быстро читался)\n", + "CACHE_PATH = Path(\"../dataset_paths_cache.pkl\")\n", + "\n", + "# 1. Загрузка данных напрямую из папок (или из кэша!)\n", + "train_dataset = EmoSetNestedDataset(DATA_ROOT, transform=train_transforms, cache_file=CACHE_PATH)\n", + "\n", + "train_loader = DataLoader(\n", + " train_dataset, \n", + " batch_size=BATCH_SIZE, \n", + " shuffle=True, \n", + " num_workers=NUM_WORKERS,\n", + " pin_memory=True,\n", + " prefetch_factor=1,\n", + " persistent_workers=True\n", + ")\n", + "\n", + "print(f\"Батчей за одну эпоху: {len(train_loader)}\")\n", + "\n", + "# Путь к файлу автоматического восстановления (чекпоинт полной сессии)\n", + "RESUME_CHECKPOINT = Path(\"../emoset_resnet50_checkpoint_latest.pth\")\n", + "\n", + "# 2. Инициализация архитектуры модели ResNet-50\n", + "print(\"\\nСоздание архитектуры ResNet50...\")\n", + "model = timm.create_model('resnet50', pretrained=False, num_classes=8)\n", + "model = model.to(DEVICE)\n", + "\n", + "# Инициализируем компоненты оптимизации\n", + "criterion = nn.CrossEntropyLoss()\n", + "optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)\n", + "scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)\n", + "\n", + "# По умолчанию начинаем с 1-й эпохи\n", + "start_epoch = 1\n", + "\n", + "# === ЛОГИКА ВОССТАНОВЛЕНИЯ / СТАРТА ===\n", + "if RESUME_CHECKPOINT.exists():\n", + " print(f\"🔄 ОБНАРУЖЕН ДЕЙСТВУЮЩИЙ ЧЕКПОИНТ: '{RESUME_CHECKPOINT}'\")\n", + " print(\"Восстанавливаем полное состояние сессии...\")\n", + " \n", + " # Загружаем сохраненный словарь состояния\n", + " checkpoint = torch.load(RESUME_CHECKPOINT, map_location=DEVICE)\n", + " \n", + " # Восстанавливаем всё до единого\n", + " model.load_state_dict(checkpoint['model_state_dict'])\n", + " optimizer.load_state_dict(checkpoint['optimizer_state_dict'])\n", + " scheduler.load_state_dict(checkpoint['scheduler_state_dict'])\n", + " start_epoch = checkpoint['epoch'] + 1 # Начинаем со следующей эпохи\n", + " \n", + " print(f\"🚀 УСПЕХ: Сессия восстановлена! Продолжаем обучение с эпохи {start_epoch}\")\n", + "else:\n", + " print(\"📝 Чекпоинт прерванной сессии не найден. Проверяем базовые веса...\")\n", + " if PREVIOUS_WEIGHTS.exists():\n", + " print(f\"УСПЕХ: Найдены предыдущие веса '{PREVIOUS_WEIGHTS}' (из EmoSet-118K). Загружаем...\")\n", + " model.load_state_dict(torch.load(PREVIOUS_WEIGHTS, map_location=DEVICE))\n", + " else:\n", + " print(\"ВНИМАНИЕ: Базовые веса не найдены. Начинаем обучение с нуля (ImageNet pretrained).\")\n", + " model = timm.create_model('resnet50', pretrained=True, num_classes=8).to(DEVICE)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "a7480834", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "⏰ Старт обучения. Целевое количество эпох: 15\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Эпоха 1/15 [Тренировка]: 0%| | 0/21338 [00:00 \u001b[39m\u001b[32m88\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m val_inputs, val_labels \u001b[38;5;129;01min\u001b[39;00m \u001b[43mval_loader\u001b[49m:\n\u001b[32m 89\u001b[39m val_inputs, val_labels = val_inputs.to(DEVICE), val_labels.to(DEVICE)\n\u001b[32m 91\u001b[39m \u001b[38;5;66;03m# Валидация тоже идет в смешанной точности для скорости\u001b[39;00m\n", + "\u001b[31mNameError\u001b[39m: name 'val_loader' is not defined" + ] + } + ], + "source": [ + "import gc\n", + "import torch\n", + "from torch.amp import autocast, GradScaler\n", + "from tqdm import tqdm\n", + "import torchvision.transforms as T\n", + "\n", + "# --- НАСТРОЙКИ EARLY STOPPING ---\n", + "PATIENCE = 4 # Сколько эпох ждем, если валидация не улучшается\n", + "best_val_loss = float('inf')\n", + "epochs_no_improve = 0\n", + "start_epoch = 1\n", + "\n", + "# --- ПЕРЕНОСИМ ТЯЖЕЛЫЕ АУГМЕНТАЦИИ НА GPU ---\n", + "gpu_transforms = torch.nn.Sequential(\n", + " T.RandomCrop((224, 224)),\n", + " T.RandomHorizontalFlip(p=0.5),\n", + " T.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),\n", + " T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])\n", + ").to(DEVICE)\n", + "# -------------------------------------------\n", + "\n", + "print(f\"⏰ Старт обучения. Целевое количество эпох: {EPOCHS}\")\n", + "scaler = GradScaler()\n", + "\n", + "try:\n", + " for epoch in range(start_epoch, EPOCHS + 1):\n", + " # ==========================================\n", + " # 1. ТРЕНИРОВКА (TRAIN)\n", + " # ==========================================\n", + " model.train()\n", + " running_loss = 0.0\n", + " correct = 0\n", + " total = 0\n", + " \n", + " pbar = tqdm(train_loader, desc=f\"Эпоха {epoch}/{EPOCHS} [Тренировка]\")\n", + " for inputs, labels in pbar:\n", + " try:\n", + " inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)\n", + " \n", + " # Применяем аугментации на лету к батчу (на GPU)\n", + " inputs = gpu_transforms(inputs)\n", + " \n", + " optimizer.zero_grad()\n", + " \n", + " with autocast(device_type=\"cuda\"): # Для PyTorch >= 2.0 лучше указывать \"cuda\"\n", + " outputs = model(inputs)\n", + " loss = criterion(outputs, labels)\n", + " \n", + " scaler.scale(loss).backward()\n", + " scaler.step(optimizer)\n", + " scaler.update()\n", + " \n", + " # Статистика\n", + " running_loss += loss.item() * inputs.size(0)\n", + " _, predicted = outputs.max(1)\n", + " total += labels.size(0)\n", + " correct += predicted.eq(labels).sum().item()\n", + " \n", + " pbar.set_postfix({'loss': f\"{loss.item():.4f}\", 'acc': f\"{correct/total:.4f}\"})\n", + " \n", + " except RuntimeError as e:\n", + " # Обработчик OOM\n", + " if \"out of memory\" in str(e).lower():\n", + " print(f\"\\n⚠️ ВНИМАНИЕ: Нехватка VRAM на батче! Выполняем экстренную очистку...\")\n", + " if 'outputs' in locals(): del outputs\n", + " if 'loss' in locals(): del loss\n", + " torch.cuda.empty_cache()\n", + " optimizer.zero_grad()\n", + " print(\"♻️ Кэш очищен. Батч пропущен. Продолжаем обучение...\")\n", + " continue\n", + " else:\n", + " raise e\n", + " \n", + " epoch_loss = running_loss / total if total > 0 else 0\n", + " epoch_acc = correct / total if total > 0 else 0\n", + " \n", + " # ==========================================\n", + " # 2. ВАЛИДАЦИЯ И EARLY STOPPING\n", + " # ==========================================\n", + " model.eval()\n", + " val_loss = 0.0\n", + " val_correct = 0\n", + " val_total = 0\n", + " \n", + " # ВАЖНО: Если у тебя нет val_loader, создай его (откуси 5-10% от датасета)\n", + " # На валидации мы НЕ применяем gpu_transforms (только нормализацию)\n", + " with torch.no_grad():\n", + " for val_inputs, val_labels in val_loader:\n", + " val_inputs, val_labels = val_inputs.to(DEVICE), val_labels.to(DEVICE)\n", + " \n", + " # Валидация тоже идет в смешанной точности для скорости\n", + " with autocast(device_type=\"cuda\"):\n", + " val_outputs = model(val_inputs)\n", + " v_loss = criterion(val_outputs, val_labels)\n", + " \n", + " val_loss += v_loss.item() * val_inputs.size(0)\n", + " _, val_predicted = val_outputs.max(1)\n", + " val_total += val_labels.size(0)\n", + " val_correct += val_predicted.eq(val_labels).sum().item()\n", + " \n", + " epoch_val_loss = val_loss / val_total if val_total > 0 else 0\n", + " epoch_val_acc = val_correct / val_total if val_total > 0 else 0\n", + " \n", + " scheduler.step()\n", + " print(f\"🏁 Эпоха {epoch} завершена | Train Loss: {epoch_loss:.4f} (Acc: {epoch_acc:.4f}) | Val Loss: {epoch_val_loss:.4f} (Acc: {epoch_val_acc:.4f})\")\n", + " \n", + " # --- ЛОГИКА РАННЕЙ ОСТАНОВКИ ---\n", + " if epoch_val_loss < best_val_loss:\n", + " best_val_loss = epoch_val_loss\n", + " epochs_no_improve = 0\n", + " # Сохраняем идеальные веса, пока сеть не переобучилась\n", + " torch.save(model.state_dict(), \"../emoset_resnet50_best.pth\")\n", + " print(\"🌟 Найдена лучшая модель! Веса сохранены.\")\n", + " else:\n", + " epochs_no_improve += 1\n", + " print(f\"⚠️ Валидационный Loss не улучшается {epochs_no_improve}/{PATIENCE} эпох.\")\n", + " \n", + " # Условие: если переобучение длится долго И мы прошли хотя бы 15 эпох\n", + " if epochs_no_improve >= PATIENCE and epoch >= 15:\n", + " print(\"\\n🛑 СРАБОТАЛА ЗАЩИТА ОТ ПЕРЕОБУЧЕНИЯ (Early Stopping)!\")\n", + " print(\"Модель начала запоминать данные вместо обобщения. Обучение досрочно завершено.\")\n", + " break # Прерываем цикл эпох\n", + " \n", + " # ==========================================\n", + " # 3. РЕГУЛЯРНОЕ СОХРАНЕНИЕ ПРОГРЕССА\n", + " # ==========================================\n", + " checkpoint_state = {\n", + " 'epoch': epoch,\n", + " 'model_state_dict': model.state_dict(),\n", + " 'optimizer_state_dict': optimizer.state_dict(),\n", + " 'scheduler_state_dict': scheduler.state_dict(),\n", + " 'scaler_state_dict': scaler.state_dict(),\n", + " 'loss': epoch_loss,\n", + " 'val_loss': epoch_val_loss\n", + " }\n", + " torch.save(checkpoint_state, RESUME_CHECKPOINT)\n", + " \n", + " # Сохранение весов конкретной эпохи (на всякий случай)\n", + " epoch_weights_path = f\"../emoset_resnet50_finetuned_ep{epoch}.pth\"\n", + " torch.save(model.state_dict(), epoch_weights_path)\n", + " \n", + " gc.collect()\n", + " torch.cuda.empty_cache()\n", + "\n", + "# ==========================================\n", + "# 4. БЕЗОПАСНЫЙ ВЫХОД ПРИ РУЧНОМ ПРЕРЫВАНИИ\n", + "# ==========================================\n", + "except KeyboardInterrupt:\n", + " print(\"\\n\\n🛑 ОБУЧЕНИЕ ПРЕРВАНО ВРУЧНУЮ (KeyboardInterrupt)!\")\n", + " print(f\"💾 Экстренное сохранение состояния конвейера на эпохе {epoch}...\")\n", + " \n", + " # Сохраняем всё, чтобы потом можно было продолжить с этого же места\n", + " checkpoint_state = {\n", + " 'epoch': epoch,\n", + " 'model_state_dict': model.state_dict(),\n", + " 'optimizer_state_dict': optimizer.state_dict(),\n", + " 'scheduler_state_dict': scheduler.state_dict(),\n", + " 'scaler_state_dict': scaler.state_dict()\n", + " }\n", + " torch.save(checkpoint_state, RESUME_CHECKPOINT)\n", + " \n", + " # Сохраняем промежуточные веса на момент остановки\n", + " interrupted_weights_path = f\"../emoset_resnet50_interrupted_ep{epoch}.pth\"\n", + " torch.save(model.state_dict(), interrupted_weights_path)\n", + " \n", + " print(f\"✅ Прогресс безопасно зафиксирован в файле {interrupted_weights_path}. Выходим.\")\n", + "\n", + "# Финальное сохранение (если цикл дошел до конца сам)\n", + "else:\n", + " if SAVE_MODEL_PATH.parent.exists():\n", + " torch.save(model.state_dict(), SAVE_MODEL_PATH)\n", + " print(f\"\\n🎉 ОБУЧЕНИЕ УСПЕШНО ЗАВЕРШЕНО! Финальная модель: {SAVE_MODEL_PATH}\")\n", + " if RESUME_CHECKPOINT.exists():\n", + " RESUME_CHECKPOINT.unlink()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python (thesis)", + "language": "python", + "name": "thesis" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/scripts/preprocessing.py b/src/scripts/preprocessing.py new file mode 100644 index 0000000..f30e904 --- /dev/null +++ b/src/scripts/preprocessing.py @@ -0,0 +1,134 @@ +import os +import random +from pathlib import Path +from tqdm.notebook import tqdm +import webdataset as wds + +# --- НАСТРОЙКИ --- +# Оригинальная папка с датасетом (на NFS) +DATA_ROOT = Path("/home/zin/projects/Thesis/NFS/Thesis/Emoset/Original-2.41M") + +# Новая папка, куда мы сложим готовые .tar архивы (шарды) +# Лучше создать её рядом с оригинальным датасетом на NFS +SHARDS_DIR = Path("/home/zin/projects/Thesis/NFS/Thesis/Emoset/shards-2.41M") +SHARDS_DIR.mkdir(parents=True, exist_ok=True) + +# Маппинг классов +EMO_MAP = { + "amusement": 0, "anger": 1, "awe": 2, "contentment": 3, + "disgust": 4, "excitement": 5, "fear": 6, "sad": 7, "sadness": 7 +} + +# Размер одного архива. 10 000 картинок — идеальный баланс +MAX_SAMPLES_PER_SHARD = 10000 + +samples = [] + +print(f"🔍 Сканирование директории {DATA_ROOT}...") +# Используем os.walk, он часто работает быстрее rglob на сетевых дисках +for root, dirs, files in os.walk(DATA_ROOT): + for file in files: + if file.lower().endswith('.jpg'): + full_path = os.path.join(root, file) + # Извлекаем эмоцию (зависит от структуры папок, берем предпоследнюю папку) + # Путь: .../amusement/0/image.jpg -> root_parts[-2] будет 'amusement' + path_parts = Path(full_path).parts + emotion_folder = path_parts[-3].lower() + + if emotion_folder in EMO_MAP: + samples.append((full_path, EMO_MAP[emotion_folder])) + +print(f"✅ Найдено изображений: {len(samples)}") + +# САМЫЙ ВАЖНЫЙ ШАГ: Глобальное перемешивание перед упаковкой +print("🔀 Перемешиваем датасет...") +random.shuffle(samples) +print("✅ Перемешивание завершено!") + +import multiprocessing as mp +from concurrent.futures import ProcessPoolExecutor, as_completed +import webdataset as wds +from PIL import Image +import io +from pathlib import Path + +# ВАЖНО: Импортируем базовый tqdm, а не notebook-версию. +# Notebook-версия в мультипроцессинге вызывает зависание графического интерфейса Jupyter. +from tqdm import tqdm + +# --- ПУТИ И НАСТРОЙКИ --- +SHARDS_DIR = Path("../../dataset/EmoSet-2.41M-shards") +SHARDS_DIR.mkdir(parents=True, exist_ok=True) + +NUM_WORKERS = 50 + +# 1. Дробим наш список на чанки +chunks = [samples[i:i + MAX_SAMPLES_PER_SHARD] for i in range(0, len(samples), MAX_SAMPLES_PER_SHARD)] + +print(f"📦 Подготовлено {len(chunks)} задач (шардов).") +print(f"💾 Целевая папка (Локальный SSD): {SHARDS_DIR}") +print(f"🚀 Запуск упаковки и сжатия в {NUM_WORKERS} потоков...\n") + +# Инициализация блокировки tqdm для мультипроцессинга (чтобы бары не съезжали) +tqdm.set_lock(mp.RLock()) + +# 2. Функция, которую выполняет каждый воркер +def build_shard(args): + shard_idx, chunk = args + shard_path = SHARDS_DIR / f"emoset-{shard_idx:06d}.tar" + + success_count = 0 + error_count = 0 + + # ХИТРОСТЬ ЗДЕСЬ: position = остаток от деления + 1. + # Это гарантирует, что все 42 воркера поделят между собой 42 строчки на экране, + # и когда воркер берет новый шард, он обновляет свою старую строчку, а не создает новую. + # leave=False заставит бар исчезнуть, когда чанк докачается. + worker_pos = (shard_idx % NUM_WORKERS) + 1 + + with wds.TarWriter(str(shard_path)) as sink: + # Рисуем прогресс-бар для текущего шарда + for i, (img_path, label) in enumerate(tqdm(chunk, desc=f"Шард {shard_idx:03d}", position=worker_pos, leave=False)): + try: + # --- МАГИЯ СЖАТИЯ --- + with Image.open(img_path) as img: + img = img.convert("RGB") + img = img.resize((256, 256), Image.Resampling.BILINEAR) + + with io.BytesIO() as img_byte_arr: + img.save(img_byte_arr, format='JPEG', quality=85) + image_data = img_byte_arr.getvalue() + # -------------------- + + key = f"{shard_idx:06d}_{i:05d}" + + sink.write({ + "__key__": key, + "jpg": image_data, + "cls": label + }) + success_count += 1 + + except Exception: + error_count += 1 + continue # Игнорируем битые файлы + + return shard_idx + +# 3. Запускаем армию воркеров +with ProcessPoolExecutor(max_workers=NUM_WORKERS) as executor: + tasks = [(i, chunk) for i, chunk in enumerate(chunks)] + + # Отправляем задачи в пул + futures = [executor.submit(build_shard, task) for task in tasks] + + # ГЛАВНЫЙ прогресс-бар (position=0, всегда висит на самой первой строчке) + for future in tqdm(as_completed(futures), total=len(tasks), desc="📊 ОБЩИЙ ПРОГРЕСС", position=0, leave=True): + try: + future.result() + except Exception: + pass + +# Печатаем пару пустых строк, чтобы финальный текст не налез на бары +print("\n" * (NUM_WORKERS + 2)) +print("🎉 ПАРАЛЛЕЛЬНАЯ УПАКОВКА И СЖАТИЕ ПОЛНОСТЬЮ ЗАВЕРШЕНЫ!") \ No newline at end of file diff --git a/src/scripts/preprocessing_2.1M.ipynb b/src/scripts/preprocessing_2.1M.ipynb new file mode 100644 index 0000000..05e7c1e --- /dev/null +++ b/src/scripts/preprocessing_2.1M.ipynb @@ -0,0 +1,919 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 4, + "id": "e6aa65e8", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import random\n", + "from pathlib import Path\n", + "from tqdm.notebook import tqdm\n", + "import webdataset as wds\n", + "\n", + "# --- НАСТРОЙКИ ---\n", + "# Оригинальная папка с датасетом (на NFS)\n", + "DATA_ROOT = Path(\"/home/zin/projects/Thesis/NFS/Thesis/Emoset/Original-2.41M\")\n", + "\n", + "# Новая папка, куда мы сложим готовые .tar архивы (шарды)\n", + "# Лучше создать её рядом с оригинальным датасетом на NFS\n", + "SHARDS_DIR = Path(\"/home/zin/projects/Thesis/NFS/Thesis/Emoset/shards-2.41M\")\n", + "SHARDS_DIR.mkdir(parents=True, exist_ok=True)\n", + "\n", + "# Маппинг классов\n", + "EMO_MAP = {\n", + " \"amusement\": 0, \"anger\": 1, \"awe\": 2, \"contentment\": 3,\n", + " \"disgust\": 4, \"excitement\": 5, \"fear\": 6, \"sad\": 7, \"sadness\": 7\n", + "}\n", + "\n", + "# Размер одного архива. 10 000 картинок — идеальный баланс\n", + "MAX_SAMPLES_PER_SHARD = 10000" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "09d0e56c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "🔍 Сканирование директории /home/zin/projects/Thesis/NFS/Thesis/Emoset/Original-2.41M...\n" + ] + }, + { + "ename": "KeyboardInterrupt", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mKeyboardInterrupt\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[5]\u001b[39m\u001b[32m, line 11\u001b[39m\n\u001b[32m 8\u001b[39m full_path = os.path.join(root, file)\n\u001b[32m 9\u001b[39m \u001b[38;5;66;03m# Извлекаем эмоцию (зависит от структуры папок, берем предпоследнюю папку)\u001b[39;00m\n\u001b[32m 10\u001b[39m \u001b[38;5;66;03m# Путь: .../amusement/0/image.jpg -> root_parts[-2] будет 'amusement'\u001b[39;00m\n\u001b[32m---> \u001b[39m\u001b[32m11\u001b[39m path_parts = \u001b[43mPath\u001b[49m\u001b[43m(\u001b[49m\u001b[43mfull_path\u001b[49m\u001b[43m)\u001b[49m.parts\n\u001b[32m 12\u001b[39m emotion_folder = path_parts[-\u001b[32m3\u001b[39m].lower()\n\u001b[32m 14\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m emotion_folder \u001b[38;5;129;01min\u001b[39;00m EMO_MAP:\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/pathlib.py:871\u001b[39m, in \u001b[36mPath.__new__\u001b[39m\u001b[34m(cls, *args, **kwargs)\u001b[39m\n\u001b[32m 869\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mcls\u001b[39m \u001b[38;5;129;01mis\u001b[39;00m Path:\n\u001b[32m 870\u001b[39m \u001b[38;5;28mcls\u001b[39m = WindowsPath \u001b[38;5;28;01mif\u001b[39;00m os.name == \u001b[33m'\u001b[39m\u001b[33mnt\u001b[39m\u001b[33m'\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m PosixPath\n\u001b[32m--> \u001b[39m\u001b[32m871\u001b[39m \u001b[38;5;28mself\u001b[39m = \u001b[38;5;28;43mcls\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_from_parts\u001b[49m\u001b[43m(\u001b[49m\u001b[43margs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 872\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28mself\u001b[39m._flavour.is_supported:\n\u001b[32m 873\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mNotImplementedError\u001b[39;00m(\u001b[33m\"\u001b[39m\u001b[33mcannot instantiate \u001b[39m\u001b[38;5;132;01m%r\u001b[39;00m\u001b[33m on your system\u001b[39m\u001b[33m\"\u001b[39m\n\u001b[32m 874\u001b[39m % (\u001b[38;5;28mcls\u001b[39m.\u001b[34m__name__\u001b[39m,))\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/pathlib.py:509\u001b[39m, in \u001b[36mPurePath._from_parts\u001b[39m\u001b[34m(cls, args)\u001b[39m\n\u001b[32m 504\u001b[39m \u001b[38;5;129m@classmethod\u001b[39m\n\u001b[32m 505\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34m_from_parts\u001b[39m(\u001b[38;5;28mcls\u001b[39m, args):\n\u001b[32m 506\u001b[39m \u001b[38;5;66;03m# We need to call _parse_args on the instance, so as to get the\u001b[39;00m\n\u001b[32m 507\u001b[39m \u001b[38;5;66;03m# right flavour.\u001b[39;00m\n\u001b[32m 508\u001b[39m \u001b[38;5;28mself\u001b[39m = \u001b[38;5;28mobject\u001b[39m.\u001b[34m__new__\u001b[39m(\u001b[38;5;28mcls\u001b[39m)\n\u001b[32m--> \u001b[39m\u001b[32m509\u001b[39m drv, root, parts = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_parse_args\u001b[49m\u001b[43m(\u001b[49m\u001b[43margs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 510\u001b[39m \u001b[38;5;28mself\u001b[39m._drv = drv\n\u001b[32m 511\u001b[39m \u001b[38;5;28mself\u001b[39m._root = root\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/pathlib.py:502\u001b[39m, in \u001b[36mPurePath._parse_args\u001b[39m\u001b[34m(cls, args)\u001b[39m\n\u001b[32m 497\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[32m 498\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mTypeError\u001b[39;00m(\n\u001b[32m 499\u001b[39m \u001b[33m\"\u001b[39m\u001b[33margument should be a str object or an os.PathLike \u001b[39m\u001b[33m\"\u001b[39m\n\u001b[32m 500\u001b[39m \u001b[33m\"\u001b[39m\u001b[33mobject returning str, not \u001b[39m\u001b[38;5;132;01m%r\u001b[39;00m\u001b[33m\"\u001b[39m\n\u001b[32m 501\u001b[39m % \u001b[38;5;28mtype\u001b[39m(a))\n\u001b[32m--> \u001b[39m\u001b[32m502\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mcls\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_flavour\u001b[49m\u001b[43m.\u001b[49m\u001b[43mparse_parts\u001b[49m\u001b[43m(\u001b[49m\u001b[43mparts\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/pathlib.py:67\u001b[39m, in \u001b[36m_Flavour.parse_parts\u001b[39m\u001b[34m(self, parts)\u001b[39m\n\u001b[32m 65\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m altsep:\n\u001b[32m 66\u001b[39m part = part.replace(altsep, sep)\n\u001b[32m---> \u001b[39m\u001b[32m67\u001b[39m drv, root, rel = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43msplitroot\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpart\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 68\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m sep \u001b[38;5;129;01min\u001b[39;00m rel:\n\u001b[32m 69\u001b[39m \u001b[38;5;28;01mfor\u001b[39;00m x \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mreversed\u001b[39m(rel.split(sep)):\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/pathlib.py:241\u001b[39m, in \u001b[36m_PosixFlavour.splitroot\u001b[39m\u001b[34m(self, part, sep)\u001b[39m\n\u001b[32m 239\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34msplitroot\u001b[39m(\u001b[38;5;28mself\u001b[39m, part, sep=sep):\n\u001b[32m 240\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m part \u001b[38;5;129;01mand\u001b[39;00m part[\u001b[32m0\u001b[39m] == sep:\n\u001b[32m--> \u001b[39m\u001b[32m241\u001b[39m stripped_part = part.lstrip(sep)\n\u001b[32m 242\u001b[39m \u001b[38;5;66;03m# According to POSIX path resolution:\u001b[39;00m\n\u001b[32m 243\u001b[39m \u001b[38;5;66;03m# http://pubs.opengroup.org/onlinepubs/009695399/basedefs/xbd_chap04.html#tag_04_11\u001b[39;00m\n\u001b[32m 244\u001b[39m \u001b[38;5;66;03m# \"A pathname that begins with two successive slashes may be\u001b[39;00m\n\u001b[32m 245\u001b[39m \u001b[38;5;66;03m# interpreted in an implementation-defined manner, although more\u001b[39;00m\n\u001b[32m 246\u001b[39m \u001b[38;5;66;03m# than two leading slashes shall be treated as a single slash\".\u001b[39;00m\n\u001b[32m 247\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(part) - \u001b[38;5;28mlen\u001b[39m(stripped_part) == \u001b[32m2\u001b[39m:\n", + "\u001b[31mKeyboardInterrupt\u001b[39m: " + ] + } + ], + "source": [ + "samples = []\n", + "\n", + "print(f\"🔍 Сканирование директории {DATA_ROOT}...\")\n", + "# Используем os.walk, он часто работает быстрее rglob на сетевых дисках\n", + "for root, dirs, files in os.walk(DATA_ROOT):\n", + " for file in files:\n", + " if file.lower().endswith('.jpg'):\n", + " full_path = os.path.join(root, file)\n", + " # Извлекаем эмоцию (зависит от структуры папок, берем предпоследнюю папку)\n", + " # Путь: .../amusement/0/image.jpg -> root_parts[-2] будет 'amusement'\n", + " path_parts = Path(full_path).parts\n", + " emotion_folder = path_parts[-3].lower()\n", + " \n", + " if emotion_folder in EMO_MAP:\n", + " samples.append((full_path, EMO_MAP[emotion_folder]))\n", + "\n", + "print(f\"✅ Найдено изображений: {len(samples)}\")\n", + "\n", + "# САМЫЙ ВАЖНЫЙ ШАГ: Глобальное перемешивание перед упаковкой\n", + "print(\"🔀 Перемешиваем датасет...\")\n", + "random.shuffle(samples)\n", + "print(\"✅ Перемешивание завершено!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0fe71d72", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "📦 Подготовлено 205 задач (шардов).\n", + "💾 Целевая папка: ../../dataset/EmoSet-2.41M-shards\n", + "🚀 Запуск упаковки в 42 потоков...\n", + "\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Exception in thread Thread-4 (ui_thread_func):\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/threading.py\", line 1045, in _bootstrap_inner\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/threading.py\", line 982, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/tmp/ipykernel_16083/731719608.py\", line 72, in ui_thread_func\n", + " File \"/home/zin/projects/Thesis/.venv/lib/python3.11/site-packages/tqdm/notebook.py\", line 223, in __init__\n", + " super().__init__(*args, **kwargs)\n", + " File \"/home/zin/projects/Thesis/.venv/lib/python3.11/site-packages/tqdm/std.py\", line 1001, in __init__\n", + " raise (\n", + "tqdm.std.TqdmKeyError: \"Unknown argument(s): {'color': 'blue'}\"\n", + "Process ForkProcess-39:\n", + "Process ForkProcess-35:\n", + "Process ForkProcess-28:\n", + "Process ForkProcess-29:\n", + "Process ForkProcess-43:\n", + "Process ForkProcess-38:\n", + "Process ForkProcess-36:\n", + "Process ForkProcess-37:\n", + "Process ForkProcess-34:\n", + "Traceback (most recent call last):\n", + "Traceback (most recent call last):\n", + "Traceback (most recent call last):\n", + "Traceback (most recent call last):\n", + "Traceback (most recent call last):\n", + "Traceback (most recent call last):\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-27:\n", + "Process ForkProcess-25:\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-30:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Process ForkProcess-41:\n", + "Process ForkProcess-31:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Process ForkProcess-22:\n", + "Process ForkProcess-32:\n", + "Process ForkProcess-40:\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-42:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Process ForkProcess-20:\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-24:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Process ForkProcess-23:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Process ForkProcess-26:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Process ForkProcess-21:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-16:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-13:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Process ForkProcess-9:\n", + "Process ForkProcess-17:\n", + "Traceback (most recent call last):\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Process ForkProcess-6:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "Process ForkProcess-8:\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Process ForkProcess-3:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Traceback (most recent call last):\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-10:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "Process ForkProcess-7:\n", + "Process ForkProcess-11:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-14:\n", + "Process ForkProcess-4:\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-5:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/threading.py\", line 982, in run\n", + "Process ForkProcess-2:\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-12:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-15:\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Traceback (most recent call last):\n", + "Process ForkProcess-18:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n" + ] + }, + { + "ename": "KeyboardInterrupt", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mKeyboardInterrupt\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[3]\u001b[39m\u001b[32m, line 102\u001b[39m\n\u001b[32m 101\u001b[39m \u001b[38;5;28;01mwith\u001b[39;00m ProcessPoolExecutor(max_workers=NUM_WORKERS) \u001b[38;5;28;01mas\u001b[39;00m executor:\n\u001b[32m--> \u001b[39m\u001b[32m102\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43m_\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mexecutor\u001b[49m\u001b[43m.\u001b[49m\u001b[43mmap\u001b[49m\u001b[43m(\u001b[49m\u001b[43mbuild_shard\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtasks\u001b[49m\u001b[43m)\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 103\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mpass\u001b[39;49;00m \u001b[38;5;66;03m# Просто ждем завершения всех задач\u001b[39;00m\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py:620\u001b[39m, in \u001b[36m_chain_from_iterable_of_lists\u001b[39m\u001b[34m(iterable)\u001b[39m\n\u001b[32m 615\u001b[39m \u001b[38;5;250m\u001b[39m\u001b[33;03m\"\"\"\u001b[39;00m\n\u001b[32m 616\u001b[39m \u001b[33;03mSpecialized implementation of itertools.chain.from_iterable.\u001b[39;00m\n\u001b[32m 617\u001b[39m \u001b[33;03mEach item in *iterable* should be a list. This function is\u001b[39;00m\n\u001b[32m 618\u001b[39m \u001b[33;03mcareful not to keep references to yielded objects.\u001b[39;00m\n\u001b[32m 619\u001b[39m \u001b[33;03m\"\"\"\u001b[39;00m\n\u001b[32m--> \u001b[39m\u001b[32m620\u001b[39m \u001b[43m\u001b[49m\u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43melement\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43miterable\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 621\u001b[39m \u001b[43m \u001b[49m\u001b[43melement\u001b[49m\u001b[43m.\u001b[49m\u001b[43mreverse\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:619\u001b[39m, in \u001b[36mExecutor.map..result_iterator\u001b[39m\u001b[34m()\u001b[39m\n\u001b[32m 618\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m timeout \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m619\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m \u001b[43m_result_or_cancel\u001b[49m\u001b[43m(\u001b[49m\u001b[43mfs\u001b[49m\u001b[43m.\u001b[49m\u001b[43mpop\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 620\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:317\u001b[39m, in \u001b[36m_result_or_cancel\u001b[39m\u001b[34m(***failed resolving arguments***)\u001b[39m\n\u001b[32m 316\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m317\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mfut\u001b[49m\u001b[43m.\u001b[49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 318\u001b[39m \u001b[38;5;28;01mfinally\u001b[39;00m:\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:451\u001b[39m, in \u001b[36mFuture.result\u001b[39m\u001b[34m(self, timeout)\u001b[39m\n\u001b[32m 449\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m.__get_result()\n\u001b[32m--> \u001b[39m\u001b[32m451\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_condition\u001b[49m\u001b[43m.\u001b[49m\u001b[43mwait\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 453\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m._state \u001b[38;5;129;01min\u001b[39;00m [CANCELLED, CANCELLED_AND_NOTIFIED]:\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/threading.py:327\u001b[39m, in \u001b[36mCondition.wait\u001b[39m\u001b[34m(self, timeout)\u001b[39m\n\u001b[32m 326\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m timeout \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m327\u001b[39m \u001b[43mwaiter\u001b[49m\u001b[43m.\u001b[49m\u001b[43macquire\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 328\u001b[39m gotit = \u001b[38;5;28;01mTrue\u001b[39;00m\n", + "\u001b[31mKeyboardInterrupt\u001b[39m: ", + "\nDuring handling of the above exception, another exception occurred:\n", + "\u001b[31mKeyboardInterrupt\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[3]\u001b[39m\u001b[32m, line 101\u001b[39m\n\u001b[32m 99\u001b[39m \u001b[38;5;66;03m# 3. Запускаем 42 боевых ядра\u001b[39;00m\n\u001b[32m 100\u001b[39m tasks = [(i, chunk, queue) \u001b[38;5;28;01mfor\u001b[39;00m i, chunk \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28menumerate\u001b[39m(chunks)]\n\u001b[32m--> \u001b[39m\u001b[32m101\u001b[39m \u001b[43m\u001b[49m\u001b[38;5;28;43;01mwith\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mProcessPoolExecutor\u001b[49m\u001b[43m(\u001b[49m\u001b[43mmax_workers\u001b[49m\u001b[43m=\u001b[49m\u001b[43mNUM_WORKERS\u001b[49m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mas\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mexecutor\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 102\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43m_\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mexecutor\u001b[49m\u001b[43m.\u001b[49m\u001b[43mmap\u001b[49m\u001b[43m(\u001b[49m\u001b[43mbuild_shard\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtasks\u001b[49m\u001b[43m)\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 103\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mpass\u001b[39;49;00m \u001b[38;5;66;03m# Просто ждем завершения всех задач\u001b[39;00m\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/_base.py:647\u001b[39m, in \u001b[36mExecutor.__exit__\u001b[39m\u001b[34m(self, exc_type, exc_val, exc_tb)\u001b[39m\n\u001b[32m 646\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34m__exit__\u001b[39m(\u001b[38;5;28mself\u001b[39m, exc_type, exc_val, exc_tb):\n\u001b[32m--> \u001b[39m\u001b[32m647\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mshutdown\u001b[49m\u001b[43m(\u001b[49m\u001b[43mwait\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43;01mTrue\u001b[39;49;00m\u001b[43m)\u001b[49m\n\u001b[32m 648\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;01mFalse\u001b[39;00m\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py:851\u001b[39m, in \u001b[36mProcessPoolExecutor.shutdown\u001b[39m\u001b[34m(self, wait, cancel_futures)\u001b[39m\n\u001b[32m 848\u001b[39m \u001b[38;5;28mself\u001b[39m._executor_manager_thread_wakeup.wakeup()\n\u001b[32m 850\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m._executor_manager_thread \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;129;01mand\u001b[39;00m wait:\n\u001b[32m--> \u001b[39m\u001b[32m851\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_executor_manager_thread\u001b[49m\u001b[43m.\u001b[49m\u001b[43mjoin\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 852\u001b[39m \u001b[38;5;66;03m# To reduce the risk of opening too many files, remove references to\u001b[39;00m\n\u001b[32m 853\u001b[39m \u001b[38;5;66;03m# objects that use file descriptors.\u001b[39;00m\n\u001b[32m 854\u001b[39m \u001b[38;5;28mself\u001b[39m._executor_manager_thread = \u001b[38;5;28;01mNone\u001b[39;00m\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/threading.py:1119\u001b[39m, in \u001b[36mThread.join\u001b[39m\u001b[34m(self, timeout)\u001b[39m\n\u001b[32m 1116\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(\u001b[33m\"\u001b[39m\u001b[33mcannot join current thread\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 1118\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m timeout \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m1119\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_wait_for_tstate_lock\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 1120\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[32m 1121\u001b[39m \u001b[38;5;66;03m# the behavior of a negative timeout isn't documented, but\u001b[39;00m\n\u001b[32m 1122\u001b[39m \u001b[38;5;66;03m# historically .join(timeout=x) for x<0 has acted as if timeout=0\u001b[39;00m\n\u001b[32m 1123\u001b[39m \u001b[38;5;28mself\u001b[39m._wait_for_tstate_lock(timeout=\u001b[38;5;28mmax\u001b[39m(timeout, \u001b[32m0\u001b[39m))\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/.pyenv/versions/3.11.7/lib/python3.11/threading.py:1139\u001b[39m, in \u001b[36mThread._wait_for_tstate_lock\u001b[39m\u001b[34m(self, block, timeout)\u001b[39m\n\u001b[32m 1136\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m\n\u001b[32m 1138\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m1139\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[43mlock\u001b[49m\u001b[43m.\u001b[49m\u001b[43macquire\u001b[49m\u001b[43m(\u001b[49m\u001b[43mblock\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtimeout\u001b[49m\u001b[43m)\u001b[49m:\n\u001b[32m 1140\u001b[39m lock.release()\n\u001b[32m 1141\u001b[39m \u001b[38;5;28mself\u001b[39m._stop()\n", + "\u001b[31mKeyboardInterrupt\u001b[39m: " + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Traceback (most recent call last):\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "Process ForkProcess-19:\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "Traceback (most recent call last):\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 314, in _bootstrap\n", + " self.run()\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/process.py\", line 108, in run\n", + " self._target(*self._args, **self._kwargs)\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 103, in get\n", + " res = self._recv_bytes()\n", + " ^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/concurrent/futures/process.py\", line 249, in _process_worker\n", + " call_item = call_queue.get(block=True)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/connection.py\", line 216, in recv_bytes\n", + " buf = self._recv_bytes(maxlength)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/queues.py\", line 102, in get\n", + " with self._rlock:\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/connection.py\", line 430, in _recv_bytes\n", + " buf = self._recv(4)\n", + " ^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/synchronize.py\", line 95, in __enter__\n", + " return self._semlock.__enter__()\n", + " ^^^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n", + " File \"/home/zin/.pyenv/versions/3.11.7/lib/python3.11/multiprocessing/connection.py\", line 395, in _recv\n", + " chunk = read(handle, remaining)\n", + " ^^^^^^^^^^^^^^^^^^^^^^^\n", + "KeyboardInterrupt\n", + "KeyboardInterrupt\n" + ] + } + ], + "source": [ + "import multiprocessing as mp\n", + "from concurrent.futures import ProcessPoolExecutor\n", + "import webdataset as wds\n", + "from PIL import Image\n", + "import io\n", + "import threading\n", + "from pathlib import Path\n", + "\n", + "# Включаем красивую версию tqdm специально для Jupyter\n", + "from tqdm.notebook import tqdm\n", + "\n", + "# --- ПУТИ И НАСТРОЙКИ ---\n", + "SHARDS_DIR = Path(\"../../dataset/EmoSet-2.41M-shards\")\n", + "SHARDS_DIR.mkdir(parents=True, exist_ok=True)\n", + "\n", + "NUM_WORKERS = 42\n", + "MAX_SAMPLES_PER_SHARD = 10000\n", + "\n", + "# Дробим список на чанки\n", + "chunks = [samples[i:i + MAX_SAMPLES_PER_SHARD] for i in range(0, len(samples), MAX_SAMPLES_PER_SHARD)]\n", + "TOTAL_FILES = len(samples)\n", + "TOTAL_SHARDS = len(chunks)\n", + "\n", + "print(f\"📦 Подготовлено {TOTAL_SHARDS} задач (шардов).\")\n", + "print(f\"💾 Целевая папка: {SHARDS_DIR}\")\n", + "print(f\"🚀 Запуск упаковки в {NUM_WORKERS} потоков...\\n\")\n", + "\n", + "# --- ФУНКЦИЯ ДЛЯ ЯДЕР ПРОЦЕССОРА ---\n", + "def build_shard(args):\n", + " shard_idx, chunk, queue = args\n", + " shard_path = SHARDS_DIR / f\"emoset-{shard_idx:06d}.tar\"\n", + " \n", + " with wds.TarWriter(str(shard_path)) as sink:\n", + " for i, (img_path, label) in enumerate(chunk):\n", + " try:\n", + " # Магия сжатия\n", + " with Image.open(img_path) as img:\n", + " img = img.convert(\"RGB\")\n", + " img = img.resize((256, 256), Image.Resampling.BILINEAR)\n", + " with io.BytesIO() as img_byte_arr:\n", + " img.save(img_byte_arr, format='JPEG', quality=85)\n", + " image_data = img_byte_arr.getvalue()\n", + " \n", + " key = f\"{shard_idx:06d}_{i:05d}\"\n", + " sink.write({\n", + " \"__key__\": key,\n", + " \"jpg\": image_data,\n", + " \"cls\": label\n", + " })\n", + " \n", + " # Чтобы не перегружать очередь, отправляем отчет каждые 50 файлов\n", + " if (i + 1) % 50 == 0:\n", + " queue.put((\"file\", 50))\n", + " \n", + " except Exception:\n", + " # Если файл битый, всё равно считаем его \"пройденным\", чтобы бар не застрял\n", + " queue.put((\"file\", 1))\n", + " continue\n", + " \n", + " # Сообщаем об остатке файлов в чанке, которые не попали в % 50\n", + " remainder = len(chunk) % 50\n", + " if remainder != 0:\n", + " queue.put((\"file\", remainder))\n", + " \n", + " # Сообщаем, что целый шард готов\n", + " queue.put((\"shard\", 1))\n", + " return shard_idx\n", + "\n", + "# --- ФУНКЦИЯ ОТРИСОВКИ ИНТЕРФЕЙСА (Фоновый поток) ---\n", + "def ui_thread_func(q, total_files, total_shards):\n", + " # Создаем две красивые независимые полоски\n", + " pbar_files = tqdm(total=total_files, desc=\"🖼️ Сжато файлов\", color=\"blue\")\n", + " pbar_shards = tqdm(total=total_shards, desc=\"📦 Готово архивов\", color=\"green\")\n", + " \n", + " while True:\n", + " msg = q.get()\n", + " if msg == \"DONE\":\n", + " break\n", + " \n", + " msg_type, count = msg\n", + " if msg_type == \"file\":\n", + " pbar_files.update(count)\n", + " elif msg_type == \"shard\":\n", + " pbar_shards.update(count)\n", + " \n", + " pbar_files.close()\n", + " pbar_shards.close()\n", + "\n", + "# === ГЛАВНЫЙ ЗАПУСК ===\n", + "if __name__ == '__main__':\n", + " # 1. Создаем диспетчер очередей\n", + " manager = mp.Manager()\n", + " queue = manager.Queue()\n", + " \n", + " # 2. Запускаем фоновый поток отрисовки\n", + " ui_thread = threading.Thread(target=ui_thread_func, args=(queue, TOTAL_FILES, TOTAL_SHARDS))\n", + " ui_thread.start()\n", + " \n", + " # 3. Запускаем 42 боевых ядра\n", + " tasks = [(i, chunk, queue) for i, chunk in enumerate(chunks)]\n", + " with ProcessPoolExecutor(max_workers=NUM_WORKERS) as executor:\n", + " for _ in executor.map(build_shard, tasks):\n", + " pass # Просто ждем завершения всех задач\n", + " \n", + " # 4. Убиваем поток отрисовки и завершаем работу\n", + " queue.put(\"DONE\")\n", + " ui_thread.join()\n", + " \n", + " print(\"\\n🎉 ПАРАЛЛЕЛЬНАЯ УПАКОВКА И СЖАТИЕ ПОЛНОСТЬЮ ЗАВЕРШЕНЫ!\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python (my-python-project)", + "language": "python", + "name": "my-python-project" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}