import signal import threading import os import sys import xxhash import cv2 import numpy as np import time """ Copyright 2025 - Robert Strutts MIT License Key Optimizations: Multi-Scale Processing: First alignment at low resolution (faster) Final refinement at full resolution (accurate) Matrix Scaling: The translation components of the transformation matrix are scaled up Rotation and scaling components remain the same Smart Downscaling: Uses INTER_AREA interpolation which is ideal for size reduction Maintains aspect ratio Performance Benefits: Processing time scales with area, so 4x downscale = ~16x faster initial alignment Memory usage significantly reduced """ start = time.perf_counter() def kill_all(): print("KILLING PROCESS") os.kill(os.getpid(), signal.SIGKILL) # Force kernel-level termination def exit_handler(signum, frame): threading.Thread(target=kill_all).start() # Run in separate thread # CTRL+C will Exit NOW!!! signal.signal(signal.SIGINT, exit_handler) def exit_timer(level): end = time.perf_counter() print(f"Execution took {end - start:.4f} seconds") exit(level) class Timer: def __init__(self, name=None): self.name = name if name else "Timer" self.start_time = None self.end_time = None def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop() self.print_result() def start(self): self.start_time = time.perf_counter() def stop(self): self.end_time = time.perf_counter() def elapsed(self): if self.start_time is None: raise ValueError("Timer has not been started") if self.end_time is None: return time.perf_counter() - self.start_time return self.end_time - self.start_time def print_result(self): elapsed = self.elapsed() print(f"{self.name}: {elapsed:.6f} seconds") def align_with_downscaling(img1, img2, downscale_factor=4, try_common_rotations=True): """ Aligns images using a multi-scale approach with initial downscaling Args: img1: Reference image (numpy array) img2: Image to align (numpy array) downscale_factor: How much to reduce size for initial alignment (e.g., 4 = 1/4 size) try_common_rotations: Whether to test common rotations first Returns: aligned_img: Aligned version of img2 transform_matrix: Final transformation matrix rotation_angle: Detected simple rotation (None if not found) """ # 1. First alignment at low resolution with Timer("1st alignment at Low Res-Downsaling"): small1 = downscale_image(img1, downscale_factor) small2 = downscale_image(img2, downscale_factor) print("Done downscaling...") print("Please wait...Rotation starting.") # Get initial alignment at low resolution with Timer("2nd alignment at Low Res-Rotations"): _, init_matrix, rotation_angle = align_with_ecc_and_rotation( small1, small2, try_common_rotations ) print("Done rotating low res image...") if init_matrix is None: return img2, None, None # Alignment failed # 2. Refine alignment at full resolution with initial estimate # Apply the rotation if one was detected if rotation_angle is not None: img2 = rotate_image(img2, rotation_angle) with Timer("Scaling translation components"): # Scale up the transformation matrix full_matrix = init_matrix.copy() full_matrix[:2, 2] *= downscale_factor # Scale translation components print("Done scale-up/transform...") with Timer("Convert images to grayscale"): # Convert images to grayscale for final alignment gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) print("Done greyscale alignment...") # Set criteria for final alignment criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 500, 1e-6) print("Please wait...ECC initial estimate.") try: with Timer("ECC init"): # Run ECC with initial estimate cc, full_matrix = cv2.findTransformECC( gray1, gray2, full_matrix, cv2.MOTION_AFFINE, criteria ) with Timer("Apply final transformation to color image"): # Apply final transformation to color image aligned_img = cv2.warpAffine( img2, full_matrix, (img1.shape[1], img1.shape[0]), flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP ) return aligned_img, full_matrix, rotation_angle except: return img2, None, None def downscale_image(img, factor): """Downscale image by specified factor while preserving aspect ratio""" if factor <= 1: return img.copy() height, width = img.shape[:2] new_size = (int(width/factor), int(height/factor)) # Use area interpolation for downscaling (best for reduction) return cv2.resize(img, new_size, interpolation=cv2.INTER_AREA) def rotate_image(image, angle): """Rotate image by specified angle (0, 90, 180, or 270 degrees)""" if angle == 90: return cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) elif angle == 180: return cv2.rotate(image, cv2.ROTATE_180) elif angle == 270: return cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE) return image def try_ecc_alignment(target, moving): """Try ECC alignment and return aligned image, matrix, and correlation coefficient""" # Initialize warp matrix warp_matrix = np.eye(2, 3, dtype=np.float32) # Set criteria criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 1000, 1e-6) try: # Run ECC cc, warp_matrix = cv2.findTransformECC( target, moving, warp_matrix, cv2.MOTION_AFFINE, criteria ) # Apply the transformation aligned = cv2.warpAffine( moving, warp_matrix, (target.shape[1], target.shape[0]), flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP ) return aligned, warp_matrix, cc except: return moving, None, 0 def apply_transform(image, matrix, target_shape): """Apply transformation matrix to color image""" if matrix is None: return image if matrix.shape == (2, 3): # Affine return cv2.warpAffine( image, matrix, (target_shape[1], target_shape[0]), flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP ) elif matrix.shape == (3, 3): # Homography return cv2.warpPerspective( image, matrix, (target_shape[1], target_shape[0]), flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP ) return image def align_ecc(img1, img2): # Convert to grayscale gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) # Define motion model (affine or homography) warp_mode = cv2.MOTION_AFFINE # or cv2.MOTION_HOMOGRAPHY if warp_mode == cv2.MOTION_HOMOGRAPHY: warp_matrix = np.eye(3, 3, dtype=np.float32) else: warp_matrix = np.eye(2, 3, dtype=np.float32) # Specify termination criteria criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 1000, 1e-6) # Run ECC try: cc, warp_matrix = cv2.findTransformECC( gray1, gray2, warp_matrix, warp_mode, criteria ) if warp_mode == cv2.MOTION_HOMOGRAPHY: aligned_img = cv2.warpPerspective( img2, warp_matrix, (img1.shape[1], img1.shape[0]) ) else: aligned_img = cv2.warpAffine( img2, warp_matrix, (img1.shape[1], img1.shape[0]) ) return aligned_img, warp_matrix except: print("Alignment failed") return img2, None def align_with_ecc_and_rotation(img1, img2, try_common_rotations=True): """ Aligns img2 to img1 using ECC, with optional pre-testing of common rotations Args: img1: Reference image (numpy array) img2: Image to align (numpy array) try_common_rotations: If True, tests common rotations first Returns: aligned_img: Aligned version of img2 transform_matrix: Transformation matrix used rotation_angle: Detected rotation angle (None if not a simple rotation) """ # Convert to grayscale for alignment gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) if try_common_rotations: # Test common rotations first best_cc = -1 best_aligned = None best_matrix = None best_angle = None for angle in [0, 90, 180, 270]: # Rotate the image rotated = rotate_image(gray2, angle) # Try ECC alignment aligned, matrix, cc = try_ecc_alignment(gray1, rotated) if cc > best_cc: best_cc = cc best_aligned = aligned best_matrix = matrix best_angle = angle if angle != 0 else None if best_cc > 0.3: # Good enough alignment found # Apply the same transformation to color image if best_angle is not None: rotated_color = rotate_image(img2, best_angle) else: rotated_color = img2 if best_matrix is not None: aligned_color = apply_transform(rotated_color, best_matrix, img1.shape) else: aligned_color = rotated_color return aligned_color, best_matrix, best_angle # If no good rotation found or try_common_rotations=False, do regular ECC aligned_img, transform_matrix = align_ecc(img1, img2) return aligned_img, transform_matrix, None def matrix_similarity_score(matrix): """ Calculate similarity score based on deviation from identity matrix. Returns 1 for perfect match (identity), decreasing towards 0 for large transformations. """ if matrix is None: return 0.0 # Alignment failed # For affine matrix (2x3) if matrix.shape == (2, 3): ideal = np.eye(2, 3, dtype=np.float32) # Normalize translation components by image dimensions (assuming 1000px as reference) normalized_matrix = matrix.copy() normalized_matrix[:, 2] /= 1000.0 # For homography matrix (3x3) elif matrix.shape == (3, 3): ideal = np.eye(3, dtype=np.float32) normalized_matrix = matrix.copy() normalized_matrix[:, 2] /= 1000.0 # Normalize translation else: return 0.0 # Calculate Frobenius norm of difference diff = np.linalg.norm(normalized_matrix - ideal) # Convert to similarity score (0-1) score = np.exp(-diff) # Exponential decay return float(np.clip(score, 0, 1)) def decomposed_similarity_score(matrix, img_width): """ Calculate score by analyzing translation, rotation, and scaling separately. img_width is used to normalize translation to image dimensions. """ if matrix is None: return 0.0 # Decompose affine matrix if matrix.shape == (2, 3): # Extract rotation and scale a, b, c, d = matrix[0,0], matrix[0,1], matrix[1,0], matrix[1,1] scale_x = np.sqrt(a*a + b*b) scale_y = np.sqrt(c*c + d*d) rotation = np.arctan2(-b, a) # Extract translation (normalized by image width) tx = matrix[0,2] / img_width ty = matrix[1,2] / img_width else: return 0.0 # Calculate penalties (adjust weights as needed) translation_penalty = np.sqrt(tx*tx + ty*ty) * 0.5 # Weight translation more scale_penalty = np.abs(scale_x - 1) + np.abs(scale_y - 1) rotation_penalty = np.abs(rotation) / np.pi # Normalized to 0-1 # Combine penalties total_penalty = translation_penalty + scale_penalty + rotation_penalty # Convert to similarity score return max(0, 1 - total_penalty) def comprehensive_similarity(img1, img2, matrix): """Combine matrix analysis with image comparison""" # 1. Matrix-based score (50% weight) matrix_score = matrix_similarity_score(matrix) # 2. Pixel-based score after alignment (50% weight) if matrix is not None: aligned = cv2.warpAffine(img2, matrix, (img1.shape[1], img1.shape[0])) pixel_score = normalized_cross_correlation(img1, aligned) else: pixel_score = 0.0 return 0.5 * matrix_score + 0.5 * pixel_score def normalized_cross_correlation(img1, img2): """Calculate NCC between two images""" # Convert to grayscale gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(np.float32) gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(np.float32) # Normalize gray1 = (gray1 - np.mean(gray1)) / (np.std(gray1) + 1e-8) gray2 = (gray2 - np.mean(gray2)) / (np.std(gray2) + 1e-8) # Calculate correlation return np.mean(gray1 * gray2) def find_duplicate_with_rotation(img1, img2): # Initialize ORB detector orb = cv2.ORB_create() # Find keypoints and descriptors kp1, des1 = orb.detectAndCompute(img1, None) kp2, des2 = orb.detectAndCompute(img2, None) # Create BFMatcher object bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) # Match descriptors matches = bf.match(des1, des2) # Sort matches by distance matches = sorted(matches, key=lambda x: x.distance) # Return similarity score (lower is more similar) return len(matches) def get_image_dimensions_cv(img): if img is not None: height, width = img.shape[:2] return width, height return None, None """ xxhash is about 5–10x faster than SHA256, non-cryptographic. If you want an even lighter setup (no installs), we can use zlib.crc32 instead — but xxhash is better if you care about collisions! """ def quick_file_hash(file_path): hasher = xxhash.xxh64() # 64-bit very fast hash try: with open(file_path, 'rb') as f: while chunk := f.read(8192): # Read in 8KB chunks hasher.update(chunk) except Exception as e: print(f"Error hashing file: {e}") return hasher.hexdigest() # --- Example usage --- if __name__ == "__main__": if len(sys.argv) < 3: print("Usage: python3 dedup.py file1.jpg file2.jpg") sys.exit(1) file1 = sys.argv[1] file2 = sys.argv[2] with Timer("Hashing"): # Quick hashes hash1 = quick_file_hash(file1) hash2 = quick_file_hash(file2) if (hash1 == hash2): print("xxHash found duplicates") print("✅ Perfect match - images are identical") print("No transformation needed") exit_timer(1) else: print("Done hashing...") with Timer("Loading Images"): # Load large images large_img1 = cv2.imread(file1) # e.g., 4000x3000 pixels large_img2 = cv2.imread(file2) # e.g., 4000x3000 pixels w, h = get_image_dimensions_cv(large_img1) w2, h2 = get_image_dimensions_cv(large_img2) if w == None or w2 == None or h == None or h2 == None: print("Aborting...Invalid Image!") exit_timer(8) if w != w2 and w != h2: print("Diffent Resolutions") exit_timer(0) if h != h2 and h != w2: print("Diffent Resolutions") exit_timer(0) print("Done loading images...") with Timer("Aligning with downscaling 1/4 size"): # Align with downscaling (initially process at 1/4 size) aligned, matrix, angle = align_with_downscaling( large_img1, large_img2, downscale_factor=4, try_common_rotations=True ) # Save result # cv2.imwrite('aligned_large.jpg', aligned) # Print debug info print(f"Detected rotation: {angle}°") print(f"Final transformation matrix:\n{matrix}") # Calculate scores matrix_score = matrix_similarity_score(matrix) if len(sys.argv) > 3: is_score = sys.argv[3] else: is_score = "" if matrix_score == 1.0 and is_score != "scores": print("✅ Perfect Matrix score, should be identical") exit_timer(1) if matrix_score < 0.3 and is_score != "scores": print("❌ Significant transformation required") exit_timer(0) if is_score == "scores": score = find_duplicate_with_rotation(large_img1, aligned) print(f"Score: {score}") decomposed_score = decomposed_similarity_score(matrix, large_img1.shape[1]) combined_score = comprehensive_similarity(large_img1, aligned, matrix) # Check for perfect alignment if matrix_score == 1.0 and decomposed_score == 1.0 and combined_score == 1.0: print("✅ Perfect match - images are identical") print("No transformation needed") exit_code = 1 elif matrix_score > 0.9 and decomposed_score > 0.9 and combined_score > 0.7: print("⚠️ Near-perfect alignment - minor differences detected") exit_code = 2 else: print("❌ Significant transformation required") exit_code = 0 print(f"Matrix deviation score: {matrix_score:.4f}") print(f"Decomposed similarity: {decomposed_score:.4f}") print(f"Combined similarity: {combined_score:.4f}") exit_timer(exit_code) """ Matrix-based scores are fast but don't consider image content Decomposed analysis gives more interpretable results (separate rotation/scale/translation) Combined approaches with pixel comparison are most accurate but slower Normalization is crucial - translation should be relative to image size """