ᕕ( ᐛ )ᕗ Wu555's blog

Image processing between two CNN

实现了一个对物体进行图像分割,提取感兴趣区域,并进行遮挡恢复和图像合并的流水线处理

从指定文件夹中遍历所有子文件夹 对每个子文件夹中的metadata.csv文件进行解析 根据面积阈值筛选图片并保存到输出目录

遍历img_selected输出目录中的所有子文件夹 对每个子文件夹,读取原始图像,并用子文件夹中的遮挡图像来恢复被遮挡的区域 将恢复后的图像保存到输出目录

遍历img_recovered目录中的所有子文件夹 对每个子文件夹,将其中的所有图像叠加合并为一张图片 将合并后的图片保存到输出目录

调用上述三个函数,完成图像选择、恢复和合并的完整流水线 允许通过参数配置输入输出文件夹等

import pandas as pd
import os
from PIL import Image
import cv2
import numpy as np
from rich.progress import Progress, BarColumn, SpinnerColumn, TimeElapsedColumn, TimeRemainingColumn
import time

def wu555_progress():
    return Progress(
        "[progress.description]{task.description}({task.completed}/{task.total})",
        SpinnerColumn(finished_text="[green]✔"),
        BarColumn(),
        "[progress.percentage]{task.percentage:>3.2f}%",
        "[yellow]⏱ ",
        TimeElapsedColumn(),
        "[cyan]⏳",
        TimeRemainingColumn(),
        "[red]Wu~"
    )

def img_selected(folder_path, lower_threshold, upper_threshold, selected_output_folder):
    sequence_folders = os.listdir(folder_path)
    progress = wu555_progress()
    task = progress.add_task("[cyan]Processing...", total=len(sequence_folders))
    
    with progress:
        for sequence_folder in sequence_folders:
            progress.update(task, advance=1)
            folder = os.path.join(folder_path, sequence_folder)
            if not os.path.isdir(folder):
                continue

            csv_file = os.path.join(folder, 'metadata.csv')
            df = pd.read_csv(csv_file)

            output_directory = os.path.join(selected_output_folder, sequence_folder)
            if not os.path.exists(output_directory):
                os.makedirs(output_directory)

            current_dir = os.path.dirname(os.path.abspath(csv_file))

            for index, row in df.iterrows():
                image_filename = str(int(row['id'])) + '.png'
                area = row['area']

                if lower_threshold < area < upper_threshold:
                    image_path = os.path.join(current_dir, image_filename)
                    output_path = os.path.join(output_directory, image_filename)
                    image = Image.open(image_path)
                    image.save(output_path)
                    # time.sleep(0.1)
                    # print(f'Saved image: {output_path}')
    print('All selected images have been saved')    

def img_recovered(img_selected_folder, recovered_output_folder, original_image_folder):
    if not os.path.exists(recovered_output_folder):
        os.makedirs(recovered_output_folder)

    progress = wu555_progress()
    task = progress.add_task("[cyan]Processing...", total=len(os.listdir(img_selected_folder)))
    
    with progress:
        for folder_name in os.listdir(img_selected_folder):
            progress.update(task, advance=1)
            folder_path = os.path.join(img_selected_folder, folder_name)
            if not os.path.isdir(folder_path):
                continue

            output_subfolder = os.path.join(recovered_output_folder, folder_name)
            if not os.path.exists(output_subfolder):
                os.makedirs(output_subfolder)

            original_image_path = os.path.join(original_image_folder, folder_name + '.jpg')
            original_image = cv2.imread(original_image_path)

            for filename in os.listdir(folder_path):
                if filename.endswith('.png'):  
                    masked_image_path = os.path.join(folder_path, filename)
                    masked_image = cv2.imread(masked_image_path)

                    if original_image is None:
                        continue

                    masked_gray = cv2.cvtColor(masked_image, cv2.COLOR_BGR2GRAY)
                    mask = np.zeros_like(original_image)
                    mask[masked_gray == 255] = original_image[masked_gray == 255]
                    restored_image = masked_image.copy()
                    restored_image[masked_gray == 255] = mask[masked_gray == 255]
                    output_path = os.path.join(output_subfolder, filename)
                    cv2.imwrite(output_path, restored_image)
                    # print(f'Saved image: {output_path}')
    print('All recovered images have been saved')
    
def img_merged(recovered_output_folder, merged_output_folder):
    subfolder_paths = [
        os.path.join(recovered_output_folder, subfolder) 
        for subfolder in os.listdir(recovered_output_folder)
        if os.path.isdir(os.path.join(recovered_output_folder, subfolder))
    ]

    progress = wu555_progress()
    task = progress.add_task("[cyan]Processing...", total=len(subfolder_paths))
    
    with progress:
        for subfolder_path in subfolder_paths:
            progress.update(task, advance=1)
            image_paths = [
                os.path.join(subfolder_path, filename) 
                for filename in os.listdir(subfolder_path)
                if filename.endswith(".jpg") or filename.endswith(".png")
            ]

            first_image = cv2.imread(image_paths[0])
            target_height, target_width, _ = first_image.shape

            target_image = np.zeros((target_height, target_width, 3), np.uint8)

            for image_path in image_paths:
                image = cv2.imread(image_path)
                mask = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) != 0
                target_image[mask] = image[mask]

            target_folder_path = merged_output_folder
            os.makedirs(target_folder_path, exist_ok=True)
            target_image_name = os.path.basename(subfolder_path) + ".jpg"
            target_image_path = os.path.join(target_folder_path, target_image_name)

            cv2.imwrite(target_image_path, target_image)
            # print(f"Saved image: {target_image_path}")
    print('All merged images have been saved') 

def main():
    folder_path = 'segment-anything/outputimg'
    lower_threshold = 40000
    upper_threshold = 80000
    selected_output_folder = 'img_selected'
    original_image_folder = 'segment-anything/inputimg'
    recovered_output_folder = 'img_recovered'
    merged_output_folder = "img_merged"

    img_selected(folder_path, lower_threshold, upper_threshold, selected_output_folder)
    img_recovered(selected_output_folder, recovered_output_folder, original_image_folder)
    img_merged(recovered_output_folder, merged_output_folder)

if __name__ == '__main__':
    main()