#!/usr/bin/env python3 import subprocess import json import sys import sqlite3 from collections import Counter from datetime import datetime import os import time import concurrent.futures import logging from typing import Dict, Any, List, Tuple # Configure error logging ERROR_LOG = "photo_stats_errors.log" logging.basicConfig( filename=ERROR_LOG, level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s' ) # Credits message CREDITS = ( "Developed by @chema_photo - Follow me on Instagram and YouTube.\n" "More info about the script at chemaphoto.com\n" ) # Define allowed extensions for RAW and JPEG RAW_EXTENSIONS = {"cr2", "cr3", "nef", "arw", "raf", "dng", "rw2"} JPEG_EXTENSIONS = {"jpg", "jpeg"} ALLOWED_EXTENSIONS = RAW_EXTENSIONS.union(JPEG_EXTENSIONS) # Database configuration DB_FILE = "photo_stats_cache.db" BATCH_SIZE = 500 # Adjust based on available memory def format_time(seconds: float) -> str: """Convert seconds to human-readable time format.""" if seconds < 60: return f"{seconds:.1f}s" minutes, seconds = divmod(seconds, 60) if minutes < 60: return f"{int(minutes)}m {int(seconds)}s" hours, minutes = divmod(minutes, 60) return f"{int(hours)}h {int(minutes)}m" class ProgressTracker: """Utility class for tracking and displaying progress.""" def __init__(self, total: int, label: str): self.start_time = time.time() self.total = total self.processed = 0 self.label = label self.last_update = 0 def update(self, increment: int = 1): self.processed += increment current_time = time.time() if current_time - self.last_update < 0.1 and self.processed != self.total: return self.last_update = current_time elapsed = current_time - self.start_time percent = self.processed / self.total * 100 eta = (elapsed / self.processed) * (self.total - self.processed) if self.processed else 0 bar_length = 40 filled = int(bar_length * self.processed // self.total) bar = '█' * filled + ' ' * (bar_length - filled) sys.stdout.write( f"\r{self.label}: [{bar}] {percent:.1f}% | {self.processed}/{self.total} | " f"Elapsed: {format_time(elapsed)} | ETA: {format_time(eta)}" ) sys.stdout.flush() def complete(self): elapsed = time.time() - self.start_time sys.stdout.write(f"\r{self.label}: Completed in {format_time(elapsed)}".ljust(100) + "\n") sys.stdout.flush() def init_db() -> sqlite3.Connection: """Initialize the SQLite database with performance optimizations.""" conn = sqlite3.connect(DB_FILE) conn.execute("PRAGMA journal_mode = WAL") conn.execute("PRAGMA synchronous = NORMAL") conn.execute("PRAGMA cache_size = -10000") c = conn.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS metadata ( source_file TEXT PRIMARY KEY, mod_time REAL, DateTimeOriginal TEXT, Model TEXT, LensModel TEXT, ISO TEXT, ExposureTime TEXT, FNumber TEXT, FocalLength TEXT, Flash TEXT, WhiteBalance TEXT, ImageWidth TEXT, ImageHeight TEXT, FocalLengthIn35mmFormat TEXT ) ''') conn.commit() return conn def update_cache(conn, file_path, mod_time, metadata): """Update (or insert) the cache record for the given file.""" c = conn.cursor() c.execute(''' INSERT OR REPLACE INTO metadata ( source_file, mod_time, DateTimeOriginal, Model, LensModel, ISO, ExposureTime, FNumber, FocalLength, Flash, WhiteBalance, ImageWidth, ImageHeight, FocalLengthIn35mmFormat ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( file_path, mod_time, metadata.get("DateTimeOriginal"), metadata.get("Model"), metadata.get("LensModel"), str(metadata.get("ISO")) if metadata.get("ISO") is not None else None, str(metadata.get("ExposureTime")) if metadata.get("ExposureTime") is not None else None, str(metadata.get("FNumber")) if metadata.get("FNumber") is not None else None, normalize_focal_length(metadata.get("FocalLength")), metadata.get("Flash"), metadata.get("WhiteBalance"), metadata.get("ImageWidth"), metadata.get("ImageHeight"), metadata.get("FocalLengthIn35mmFormat") )) conn.commit() def get_cached_metadata(conn, file_path, current_mod_time): """Return cached metadata if the file exists in cache and has not changed.""" c = conn.cursor() c.execute(''' SELECT mod_time, DateTimeOriginal, Model, LensModel, ISO, ExposureTime, FNumber, FocalLength, Flash, WhiteBalance, ImageWidth, ImageHeight, FocalLengthIn35mmFormat FROM metadata WHERE source_file=? ''', (file_path,)) row = c.fetchone() if row and float(row[0]) == current_mod_time: return { "DateTimeOriginal": row[1], "Model": row[2], "LensModel": row[3], "ISO": row[4], "ExposureTime": row[5], "FNumber": row[6], "FocalLength": normalize_focal_length(row[7]), "Flash": row[8], "WhiteBalance": row[9], "ImageWidth": row[10], "ImageHeight": row[11], "FocalLengthIn35mmFormat": row[12] } return None def run_exiftool(file_path): """Run ExifTool on a single file and return the metadata dictionary.""" cmd = ["exiftool", "-json", file_path] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) data = json.loads(result.stdout) if data and isinstance(data, list): return data[0] except Exception as e: print(f"Error running ExifTool on {file_path}: {e}") return {} def run_exiftool_batch(file_paths: List[str]) -> List[Dict[str, Any]]: """Process multiple files in one ExifTool call with error suppression.""" if not file_paths: return [] try: cmd = ["exiftool", "-json", "-q", "-fast"] + file_paths result = subprocess.run(cmd, capture_output=True, text=True, check=True, stderr=subprocess.DEVNULL) return json.loads(result.stdout) except Exception as e: logging.error(f"ExifTool error: {str(e)}") logging.error(f"Failed files: {file_paths}") return [] def get_bulk_cached_metadata(conn: sqlite3.Connection, file_paths: List[str]) -> Dict[str, Tuple[float, Dict[str, Any]]]: """Fetch cached metadata for multiple files at once.""" if not file_paths: return {} c = conn.cursor() placeholders = ','.join(['?'] * len(file_paths)) query = f''' SELECT source_file, mod_time, DateTimeOriginal, Model, LensModel, ISO, ExposureTime, FNumber, FocalLength, Flash, WhiteBalance, ImageWidth, ImageHeight, FocalLengthIn35mmFormat FROM metadata WHERE source_file IN ({placeholders}) ''' c.execute(query, file_paths) return {row[0]: (row[1], { "DateTimeOriginal": row[2], "Model": row[3], "LensModel": row[4], "ISO": row[5], "ExposureTime": row[6], "FNumber": row[7], "FocalLength": normalize_focal_length(row[8]), "Flash": row[9], "WhiteBalance": row[10], "ImageWidth": row[11], "ImageHeight": row[12], "FocalLengthIn35mmFormat": row[13] }) for row in c.fetchall()} def update_cache_bulk(conn: sqlite3.Connection, file_data: Dict[str, Tuple[float, Dict[str, Any]]]): """Bulk update cache using executemany.""" if not file_data: return c = conn.cursor() data_tuples = [] for file_path, (mod_time, metadata) in file_data.items(): data_tuples.append(( file_path, mod_time, metadata.get("DateTimeOriginal"), metadata.get("Model"), metadata.get("LensModel"), str(metadata.get("ISO")) if metadata.get("ISO") is not None else None, str(metadata.get("ExposureTime")) if metadata.get("ExposureTime") is not None else None, str(metadata.get("FNumber")) if metadata.get("FNumber") is not None else None, normalize_focal_length(metadata.get("FocalLength")), metadata.get("Flash"), metadata.get("WhiteBalance"), metadata.get("ImageWidth"), metadata.get("ImageHeight"), metadata.get("FocalLengthIn35mmFormat") )) c.executemany(''' INSERT OR REPLACE INTO metadata VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', data_tuples) conn.commit() def process_files_optimized(file_list: List[str], conn: sqlite3.Connection) -> List[Dict[str, Any]]: """Process files in optimized batches with caching.""" all_metadata = [] total_files = len(file_list) progress = ProgressTracker(total_files, "Processing files") for i in range(0, len(file_list), BATCH_SIZE): batch = file_list[i:i+BATCH_SIZE] current_mod_times = {fp: os.path.getmtime(fp) for fp in batch} cached_data = get_bulk_cached_metadata(conn, batch) to_process = [] for fp in batch: if fp not in cached_data or cached_data[fp][0] != current_mod_times[fp]: to_process.append(fp) new_metadata = {} if to_process: exif_results = run_exiftool_batch(to_process) new_metadata = {m["SourceFile"]: m for m in exif_results} update_data = {fp: (current_mod_times[fp], new_metadata[fp]) for fp in to_process if fp in new_metadata} if update_data: update_cache_bulk(conn, update_data) for fp in batch: if fp in cached_data and cached_data[fp][0] == current_mod_times[fp]: meta = cached_data[fp][1] else: meta = new_metadata.get(fp, {}) meta["SourceFile"] = fp all_metadata.append(meta) progress.update(len(batch)) progress.complete() return all_metadata def normalize_focal_length(focal: str) -> str: """ Normalize focal length values. For example, converts "85.0 mm" and "85 mm" both to "85 mm". """ if not focal: return "" try: # Remove 'mm' if present, then convert to float. focal = focal.lower().replace("mm", "").strip() value = float(focal) if value.is_integer(): return f"{int(value)} mm" else: return f"{value:.1f} mm" except ValueError: return focal # Define a mapping for white balance normalization WB_MAPPING = { "auto": "auto", "auto (ambience priority)": "auto", "daylight": "daylight", "cloudy": "cloudy", "fluorescent": "fluorescent", "tungsten": "tungsten", "shade": "shade", "manual": "manual", "manual temperature (kelvin)": "manual" # Other values will be processed in normalize_white_balance below. } def normalize_white_balance(wb: str) -> str: """ Normalize white balance values. Any value that starts with "unknown" or is "custom" is mapped to "manual". Otherwise, the mapping dictionary is applied. """ if not wb: return "manual" wb_norm = wb.strip().lower() if wb_norm.startswith("unknown") or wb_norm == "custom": return "manual" return WB_MAPPING.get(wb_norm, wb_norm) def process_directory_chunk(root: str, files: List[str]) -> Dict[Tuple[str, str], str]: """Process a directory chunk in parallel, grouping files by (root, base filename).""" local_group = {} for f in files: ext = f.split('.')[-1].lower() if ext in ALLOWED_EXTENSIONS: full_path = os.path.join(root, f) base_name = os.path.splitext(f)[0] key = (root, base_name) if key in local_group: existing_ext = os.path.splitext(local_group[key])[1][1:].lower() new_ext = os.path.splitext(full_path)[1][1:].lower() if existing_ext in JPEG_EXTENSIONS and new_ext in RAW_EXTENSIONS: local_group[key] = full_path else: local_group[key] = full_path return local_group def print_counter(title: str, counter: Counter, formatter=lambda x: x, threshold: int = 3): """ Print statistics for a counter. Only display keys with count >= threshold individually; group others into "Other (= threshold} other_total = sum(v for k, v in counter.items() if v < threshold) print(f"=== {title} ===") for item, count in sorted(main_items.items(), key=lambda x: -x[1]): print(f"{formatter(item)}: {count} photos") if other_total > 0: print(f"Other (<{threshold}): {other_total} photos") print() def process_directory(directory: str): """ Main directory processing with parallel scanning. Groups files by (directory, base filename) so that if both RAW and JPEG exist in the same folder, the RAW file is preferred. Then, uses caching and optimized batch processing to extract metadata and compute statistics. """ grouped_files: Dict[Tuple[str, str], str] = {} print(f"Starting directory scan: {directory}") start_time = time.time() # Count total directories for progress tracking dir_count = sum(1 for _ in os.walk(directory)) dir_progress = ProgressTracker(dir_count, "Scanning directories") with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for root, _, files in os.walk(directory): futures.append(executor.submit(process_directory_chunk, root, files)) for future in concurrent.futures.as_completed(futures): dir_progress.update() chunk_group = future.result() for key, path in chunk_group.items(): if key in grouped_files: existing_ext = os.path.splitext(os.path.basename(grouped_files[key]))[1][1:].lower() new_ext = os.path.splitext(path)[1][1:].lower() if existing_ext in JPEG_EXTENSIONS and new_ext in RAW_EXTENSIONS: grouped_files[key] = path else: grouped_files[key] = path dir_progress.complete() total_photos = len(grouped_files) scan_time = time.time() - start_time print(f"\nTotal photos to process (after grouping by folder and base name): {total_photos} (scan took {format_time(scan_time)})") conn = init_db() file_list = list(grouped_files.values()) metadata_list = process_files_optimized(file_list, conn) conn.close() # Initialize counters counters = { 'year': Counter(), 'month': Counter(), 'camera': Counter(), 'lens': Counter(), 'iso': Counter(), 'shutter': Counter(), 'aperture': Counter(), 'focal': Counter(), 'flash': Counter(), 'white_balance': Counter(), 'resolution': Counter(), 'focal35': Counter() } print("\nAnalyzing statistics...") start_analysis = time.time() for item in metadata_list: # Year/Month date_str = item.get("DateTimeOriginal") if date_str: try: dt = datetime.strptime(date_str, "%Y:%m:%d %H:%M:%S") counters['year'][dt.year] += 1 counters['month'][dt.month] += 1 except Exception: pass # Camera if model := item.get("Model"): counters['camera'][model.strip()] += 1 # Lens if lens := item.get("LensModel"): counters['lens'][lens.strip()] += 1 # ISO if iso := item.get("ISO"): counters['iso'][str(iso)] += 1 # Shutter speed if shutter := item.get("ExposureTime"): counters['shutter'][str(shutter)] += 1 # Aperture if aperture := item.get("FNumber"): counters['aperture'][str(aperture)] += 1 # Focal length (normalize) if focal := item.get("FocalLength"): normalized_focal = normalize_focal_length(focal) counters['focal'][normalized_focal] += 1 # Flash status if flash := item.get("Flash"): counters['flash'][str(flash)] += 1 # White balance (normalized) if wb := item.get("WhiteBalance"): normalized_wb = normalize_white_balance(wb) counters['white_balance'][normalized_wb] += 1 # Resolution if (width := item.get("ImageWidth")) and (height := item.get("ImageHeight")): res = f"{width}x{height}" counters['resolution'][res] += 1 # Focal Length in 35mm Format if focal35 := item.get("FocalLengthIn35mmFormat"): counters['focal35'][str(focal35)] += 1 analysis_time = time.time() - start_analysis print(f"Analysis completed in {format_time(analysis_time)}\n") # Group rare resolutions as before resolution_counter = counters['resolution'] common_resolutions = {res: count for res, count in resolution_counter.items() if count >= 10} other_resolutions = sum(count for res, count in resolution_counter.items() if count < 10) if other_resolutions > 0: common_resolutions["Other (<10)"] = other_resolutions counters['resolution'] = common_resolutions def print_counter(title: str, counter: Counter, formatter=lambda x: x, threshold: int = 3): print(f"=== {title} ===") main_items = {k: v for k, v in counter.items() if v >= threshold} other_total = sum(v for k, v in counter.items() if v < threshold) for item, count in sorted(main_items.items(), key=lambda x: -x[1]): print(f"{formatter(item)}: {count} photos") if other_total > 0: print(f"Other (<{threshold}): {other_total} photos") print() print_counter("Year Statistics", counters['year'], lambda y: f"Year {y}") print_counter("Month Statistics", counters['month'], lambda m: f"Month {m}") print_counter("Camera Models", counters['camera']) print_counter("Lens Models", counters['lens']) print_counter("ISO Statistics", counters['iso'], lambda iso: f"ISO {iso}") print_counter("Shutter Speed Statistics", counters['shutter'], lambda s: f"ExposureTime {s}") print_counter("Aperture Statistics", counters['aperture'], lambda a: f"FNumber {a}") print_counter("Focal Length Statistics", counters['focal'], lambda f: f"Focal Length {f}") print_counter("Flash Statistics", counters['flash'], lambda f: f"Flash {f}") print_counter("White Balance Statistics", counters['white_balance'], lambda wb: f"White Balance {wb}") print_counter("Resolution Statistics", counters['resolution']) print_counter("Focal Length (35mm Format) Statistics", counters['focal35'], lambda f: f"Focal Length (35mm) {f}") def main(): # Print credits at the beginning print(CREDITS) if len(sys.argv) < 2: directory = os.getcwd() print(f"No directory specified. Using current directory: {directory}") else: directory = sys.argv[1] try: start_time = time.time() process_directory(directory) total_time = time.time() - start_time print(f"\nTotal processing time: {format_time(total_time)}") except KeyboardInterrupt: print("\nProcessing aborted by user") sys.exit(1) except Exception as e: logging.exception("Unexpected error in main") print(f"\nUnexpected error occurred. See {ERROR_LOG} for details.") sys.exit(1) # Print credits at the end print(CREDITS) if __name__ == "__main__": # Suppress other error outputs sys.stderr = open(os.devnull, 'w') main()