diff --git a/.gitignore b/.gitignore index 3004b96..79399a2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ myenv +__pycache__ dups.txt alike.txt invalid.txt diff --git a/dedup.py b/dedup.py index 112ccfd..5041cb1 100644 --- a/dedup.py +++ b/dedup.py @@ -1,433 +1,12 @@ import signal -import threading -import os import sys -import xxhash import cv2 import numpy as np import time +# My Custom Library called delibs.py +import delibs -""" -Copyright 2025 - Robert Strutts MIT License - -Key Optimizations: - -Multi-Scale Processing: -First alignment at low resolution (faster) -Final refinement at full resolution (accurate) - -Matrix Scaling: -The translation components of the transformation matrix are scaled up -Rotation and scaling components remain the same - -Smart Downscaling: -Uses INTER_AREA interpolation which is ideal for size reduction -Maintains aspect ratio - -Performance Benefits: -Processing time scales with area, so 4x downscale = ~16x faster initial alignment -Memory usage significantly reduced -""" - -start = time.perf_counter() - -def kill_all(): - print("KILLING PROCESS") - os.kill(os.getpid(), signal.SIGKILL) # Force kernel-level termination - -def exit_handler(signum, frame): - threading.Thread(target=kill_all).start() # Run in separate thread -# CTRL+C will Exit NOW!!! -signal.signal(signal.SIGINT, exit_handler) - -def exit_timer(level): - end = time.perf_counter() - print(f"⏱ Execution took {end - start:.4f} seconds") - exit(level) - -class Timer: - def __init__(self, name=None): - self.name = name if name else "Timer" - self.start_time = None - self.end_time = None - - def __enter__(self): - self.start() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.stop() - self.print_result() - - def start(self): - self.start_time = time.perf_counter() - - def stop(self): - self.end_time = time.perf_counter() - - def elapsed(self): - if self.start_time is None: - raise ValueError("Timer has not been started") - if self.end_time is None: - return time.perf_counter() - self.start_time - return self.end_time - self.start_time - - def print_result(self): - elapsed = self.elapsed() - print(f"{self.name}: ⏱ {elapsed:.6f} seconds") - -def align_with_downscaling(img1, img2, downscale_factor=4, try_common_rotations=True): - """ - Aligns images using a multi-scale approach with initial downscaling - - Args: - img1: Reference image (numpy array) - img2: Image to align (numpy array) - downscale_factor: How much to reduce size for initial alignment (e.g., 4 = 1/4 size) - try_common_rotations: Whether to test common rotations first - - Returns: - aligned_img: Aligned version of img2 - transform_matrix: Final transformation matrix - rotation_angle: Detected simple rotation (None if not found) - """ - # 1. First alignment at low resolution - with Timer("1st alignment at Low Res-Downsaling"): - small1 = downscale_image(img1, downscale_factor) - small2 = downscale_image(img2, downscale_factor) - print("Done downscaling...") - print("Please wait...Rotation starting.") - # Get initial alignment at low resolution - with Timer("2nd alignment at Low Res-Rotations"): - _, init_matrix, rotation_angle = align_with_ecc_and_rotation( - small1, small2, try_common_rotations - ) - print("Done rotating low res image...") - if init_matrix is None: - return img2, None, None # Alignment failed - - # 2. Refine alignment at full resolution with initial estimate - # Apply the rotation if one was detected - if rotation_angle is not None: - img2 = rotate_image(img2, rotation_angle) - - with Timer("Scaling translation components"): - # Scale up the transformation matrix - full_matrix = init_matrix.copy() - full_matrix[:2, 2] *= downscale_factor # Scale translation components - print("Done scale-up/transform...") - with Timer("Convert images to grayscale"): - # Convert images to grayscale for final alignment - gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) - gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) - print("Done greyscale alignment...") - # Set criteria for final alignment - criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 500, 1e-6) - print("Please wait...ECC initial estimate.") - try: - with Timer("ECC init"): - # Run ECC with initial estimate - cc, full_matrix = cv2.findTransformECC( - gray1, gray2, full_matrix, cv2.MOTION_AFFINE, criteria - ) - - with Timer("Apply final transformation to color image"): - # Apply final transformation to color image - aligned_img = cv2.warpAffine( - img2, full_matrix, (img1.shape[1], img1.shape[0]), - flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP - ) - - return aligned_img, full_matrix, rotation_angle - except: - return img2, None, None - -def downscale_image(img, factor): - """Downscale image by specified factor while preserving aspect ratio""" - if factor <= 1: - return img.copy() - - height, width = img.shape[:2] - new_size = (int(width/factor), int(height/factor)) - - # Use area interpolation for downscaling (best for reduction) - return cv2.resize(img, new_size, interpolation=cv2.INTER_AREA) - -def rotate_image(image, angle): - """Rotate image by specified angle (0, 90, 180, or 270 degrees)""" - if angle == 90: - return cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) - elif angle == 180: - return cv2.rotate(image, cv2.ROTATE_180) - elif angle == 270: - return cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE) - return image - -def try_ecc_alignment(target, moving): - """Try ECC alignment and return aligned image, matrix, and correlation coefficient""" - # Initialize warp matrix - warp_matrix = np.eye(2, 3, dtype=np.float32) - - # Set criteria - criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 1000, 1e-6) - - try: - # Run ECC - cc, warp_matrix = cv2.findTransformECC( - target, moving, warp_matrix, cv2.MOTION_AFFINE, criteria - ) - - # Apply the transformation - aligned = cv2.warpAffine( - moving, warp_matrix, (target.shape[1], target.shape[0]), - flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP - ) - - return aligned, warp_matrix, cc - except: - return moving, None, 0 - -def apply_transform(image, matrix, target_shape): - """Apply transformation matrix to color image""" - if matrix is None: - return image - - if matrix.shape == (2, 3): # Affine - return cv2.warpAffine( - image, matrix, (target_shape[1], target_shape[0]), - flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP - ) - elif matrix.shape == (3, 3): # Homography - return cv2.warpPerspective( - image, matrix, (target_shape[1], target_shape[0]), - flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP - ) - return image - -def align_ecc(img1, img2): - # Convert to grayscale - gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) - gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) - - # Define motion model (affine or homography) - warp_mode = cv2.MOTION_AFFINE # or cv2.MOTION_HOMOGRAPHY - - if warp_mode == cv2.MOTION_HOMOGRAPHY: - warp_matrix = np.eye(3, 3, dtype=np.float32) - else: - warp_matrix = np.eye(2, 3, dtype=np.float32) - - # Specify termination criteria - criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 1000, 1e-6) - - # Run ECC - try: - cc, warp_matrix = cv2.findTransformECC( - gray1, gray2, warp_matrix, warp_mode, criteria - ) - - if warp_mode == cv2.MOTION_HOMOGRAPHY: - aligned_img = cv2.warpPerspective( - img2, warp_matrix, (img1.shape[1], img1.shape[0]) - ) - else: - aligned_img = cv2.warpAffine( - img2, warp_matrix, (img1.shape[1], img1.shape[0]) - ) - - return aligned_img, warp_matrix - except: - print("Alignment failed") - return img2, None - -def align_with_ecc_and_rotation(img1, img2, try_common_rotations=True): - """ - Aligns img2 to img1 using ECC, with optional pre-testing of common rotations - - Args: - img1: Reference image (numpy array) - img2: Image to align (numpy array) - try_common_rotations: If True, tests common rotations first - - Returns: - aligned_img: Aligned version of img2 - transform_matrix: Transformation matrix used - rotation_angle: Detected rotation angle (None if not a simple rotation) - """ - # Convert to grayscale for alignment - gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) - gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) - - if try_common_rotations: - # Test common rotations first - best_cc = -1 - best_aligned = None - best_matrix = None - best_angle = None - - for angle in [0, 90, 180, 270]: - # Rotate the image - rotated = rotate_image(gray2, angle) - - # Try ECC alignment - aligned, matrix, cc = try_ecc_alignment(gray1, rotated) - - if cc > best_cc: - best_cc = cc - best_aligned = aligned - best_matrix = matrix - best_angle = angle if angle != 0 else None - - if best_cc > 0.3: # Good enough alignment found - # Apply the same transformation to color image - if best_angle is not None: - rotated_color = rotate_image(img2, best_angle) - else: - rotated_color = img2 - - if best_matrix is not None: - aligned_color = apply_transform(rotated_color, best_matrix, img1.shape) - else: - aligned_color = rotated_color - - return aligned_color, best_matrix, best_angle - - # If no good rotation found or try_common_rotations=False, do regular ECC - aligned_img, transform_matrix = align_ecc(img1, img2) - return aligned_img, transform_matrix, None - -def matrix_similarity_score(matrix): - """ - Calculate similarity score based on deviation from identity matrix. - Returns 1 for perfect match (identity), decreasing towards 0 for large transformations. - """ - if matrix is None: - return 0.0 # Alignment failed - - # For affine matrix (2x3) - if matrix.shape == (2, 3): - ideal = np.eye(2, 3, dtype=np.float32) - # Normalize translation components by image dimensions (assuming 1000px as reference) - normalized_matrix = matrix.copy() - normalized_matrix[:, 2] /= 1000.0 - # For homography matrix (3x3) - elif matrix.shape == (3, 3): - ideal = np.eye(3, dtype=np.float32) - normalized_matrix = matrix.copy() - normalized_matrix[:, 2] /= 1000.0 # Normalize translation - else: - return 0.0 - - # Calculate Frobenius norm of difference - diff = np.linalg.norm(normalized_matrix - ideal) - - # Convert to similarity score (0-1) - score = np.exp(-diff) # Exponential decay - return float(np.clip(score, 0, 1)) - -def decomposed_similarity_score(matrix, img_width): - """ - Calculate score by analyzing translation, rotation, and scaling separately. - img_width is used to normalize translation to image dimensions. - """ - if matrix is None: - return 0.0 - - # Decompose affine matrix - if matrix.shape == (2, 3): - # Extract rotation and scale - a, b, c, d = matrix[0,0], matrix[0,1], matrix[1,0], matrix[1,1] - scale_x = np.sqrt(a*a + b*b) - scale_y = np.sqrt(c*c + d*d) - rotation = np.arctan2(-b, a) - - # Extract translation (normalized by image width) - tx = matrix[0,2] / img_width - ty = matrix[1,2] / img_width - else: - return 0.0 - - # Calculate penalties (adjust weights as needed) - translation_penalty = np.sqrt(tx*tx + ty*ty) * 0.5 # Weight translation more - scale_penalty = np.abs(scale_x - 1) + np.abs(scale_y - 1) - rotation_penalty = np.abs(rotation) / np.pi # Normalized to 0-1 - - # Combine penalties - total_penalty = translation_penalty + scale_penalty + rotation_penalty - - # Convert to similarity score - return max(0, 1 - total_penalty) - -def comprehensive_similarity(img1, img2, matrix): - """Combine matrix analysis with image comparison""" - # 1. Matrix-based score (50% weight) - matrix_score = matrix_similarity_score(matrix) - - # 2. Pixel-based score after alignment (50% weight) - if matrix is not None: - aligned = cv2.warpAffine(img2, matrix, (img1.shape[1], img1.shape[0])) - pixel_score = normalized_cross_correlation(img1, aligned) - else: - pixel_score = 0.0 - - return 0.5 * matrix_score + 0.5 * pixel_score - -def normalized_cross_correlation(img1, img2): - """Calculate NCC between two images""" - # Convert to grayscale - gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(np.float32) - gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(np.float32) - - # Normalize - gray1 = (gray1 - np.mean(gray1)) / (np.std(gray1) + 1e-8) - gray2 = (gray2 - np.mean(gray2)) / (np.std(gray2) + 1e-8) - - # Calculate correlation - return np.mean(gray1 * gray2) - -def find_duplicate_with_rotation(img1, img2): - # Initialize ORB detector - orb = cv2.ORB_create() - - # Find keypoints and descriptors - kp1, des1 = orb.detectAndCompute(img1, None) - kp2, des2 = orb.detectAndCompute(img2, None) - - # Create BFMatcher object - bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) - - # Match descriptors - matches = bf.match(des1, des2) - - # Sort matches by distance - matches = sorted(matches, key=lambda x: x.distance) - - # Return similarity score (lower is more similar) - return len(matches) - -def get_image_dimensions_cv(img): - if img is not None: - height, width = img.shape[:2] - return width, height - return None, None - -""" -xxhash is about 5–10x faster than SHA256, non-cryptographic. -If you want an even lighter setup (no installs), we can use zlib.crc32 instead — -but xxhash is better if you care about collisions! -""" -def quick_file_hash(file_path): - hasher = xxhash.xxh64() # 64-bit very fast hash - try: - with open(file_path, 'rb') as f: - while chunk := f.read(8192): # Read in 8KB chunks - hasher.update(chunk) - except Exception as e: - print(f"Error hashing file: {e}") - return hasher.hexdigest() - -# --- Example usage --- -if __name__ == "__main__": +def main(): if len(sys.argv) < 3: print("Usage: python3 dedup.py file1.jpg file2.jpg") sys.exit(3) @@ -435,43 +14,43 @@ if __name__ == "__main__": file1 = sys.argv[1] file2 = sys.argv[2] - with Timer("Hashing"): + with delibs.Timer("Hashing"): # Quick hashes - hash1 = quick_file_hash(file1) - hash2 = quick_file_hash(file2) + hash1 = delibs.quick_file_hash(file1) + hash2 = delibs.quick_file_hash(file2) if (hash1 == hash2): print("xxHash found duplicates") print("❌ Perfect match - images are identical - Duplicate Found!") print("No transformation needed") - exit_timer(1) + delibs.exit_timer(1) else: print("Done hashing...") - with Timer("Loading Images"): + with delibs.Timer("Loading Images"): # Load large images large_img1 = cv2.imread(file1) # e.g., 4000x3000 pixels large_img2 = cv2.imread(file2) # e.g., 4000x3000 pixels - w, h = get_image_dimensions_cv(large_img1) - w2, h2 = get_image_dimensions_cv(large_img2) + w, h = delibs.get_image_dimensions_cv(large_img1) + w2, h2 = delibs.get_image_dimensions_cv(large_img2) if w == None or w2 == None or h == None or h2 == None: print("❌Aborting❌...Invalid Image!") - exit_timer(8) + delibs.exit_timer(8) if w != w2 and w != h2: print("Diffent Resolutions") print("👌Not a Duplicate") - exit_timer(0) + delibs.exit_timer(0) if h != h2 and h != w2: print("Diffent Resolutions") print("👌Not a Duplicate") - exit_timer(0) + delibs.exit_timer(0) print("Done loading images...") - with Timer("Aligning with downscaling 1/4 size"): + with delibs.Timer("Aligning with downscaling 1/4 size"): # Align with downscaling (initially process at 1/4 size) - aligned, matrix, angle = align_with_downscaling( + aligned, matrix, angle = delibs.align_with_downscaling( large_img1, large_img2, downscale_factor=4, try_common_rotations=True @@ -485,7 +64,7 @@ if __name__ == "__main__": print(f"Final transformation matrix:\n{matrix}") # Calculate scores - matrix_score = matrix_similarity_score(matrix) + matrix_score = delibs.matrix_similarity_score(matrix) if len(sys.argv) > 3: is_score = sys.argv[3] @@ -494,16 +73,16 @@ if __name__ == "__main__": if matrix_score == 1.0 and is_score != "scores": print("❌ Perfect match score, images should be identical - Duplicate Found!") - exit_timer(1) + delibs.exit_timer(1) if matrix_score < 0.3 and is_score != "scores": print("👌Not a Duplicate, best guess!") - exit_timer(0) + delibs.exit_timer(0) if is_score == "scores": - score = find_duplicate_with_rotation(large_img1, aligned) + score = delibs.find_duplicate_with_rotation(large_img1, aligned) print(f"Score: {score}") - decomposed_score = decomposed_similarity_score(matrix, large_img1.shape[1]) - combined_score = comprehensive_similarity(large_img1, aligned, matrix) + decomposed_score = delibs.decomposed_similarity_score(matrix, large_img1.shape[1]) + combined_score = delibs.comprehensive_similarity(large_img1, aligned, matrix) # Check for perfect alignment if matrix_score == 1.0 and decomposed_score == 1.0 and combined_score == 1.0: @@ -520,7 +99,11 @@ if __name__ == "__main__": print(f"Matrix deviation score: {matrix_score:.4f}") print(f"Decomposed similarity: {decomposed_score:.4f}") print(f"Combined similarity: {combined_score:.4f}") - exit_timer(exit_code) + delibs.exit_timer(exit_code) + +# --- Example usage --- +if __name__ == "__main__": + main() """ Matrix-based scores are fast but don't consider image content diff --git a/delibs.py b/delibs.py new file mode 100644 index 0000000..0bf5e04 --- /dev/null +++ b/delibs.py @@ -0,0 +1,427 @@ +import signal +import threading +import os +import sys +import xxhash +import cv2 +import numpy as np +import time + +""" +Copyright 2025 - Robert Strutts MIT License + +Key Optimizations: + +Multi-Scale Processing: +First alignment at low resolution (faster) +Final refinement at full resolution (accurate) + +Matrix Scaling: +The translation components of the transformation matrix are scaled up +Rotation and scaling components remain the same + +Smart Downscaling: +Uses INTER_AREA interpolation which is ideal for size reduction +Maintains aspect ratio + +Performance Benefits: +Processing time scales with area, so 4x downscale = ~16x faster initial alignment +Memory usage significantly reduced +""" + +start = time.perf_counter() + +def kill_all(): + print("KILLING PROCESS") + os.kill(os.getpid(), signal.SIGKILL) # Force kernel-level termination + +def exit_handler(signum, frame): + threading.Thread(target=kill_all).start() # Run in separate thread +# CTRL+C will Exit NOW!!! +signal.signal(signal.SIGINT, exit_handler) + +def exit_timer(level): + end = time.perf_counter() + print(f"⏱ Execution took {end - start:.4f} seconds") + exit(level) + +class Timer: + def __init__(self, name=None): + self.name = name if name else "Timer" + self.start_time = None + self.end_time = None + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + self.print_result() + + def start(self): + self.start_time = time.perf_counter() + + def stop(self): + self.end_time = time.perf_counter() + + def elapsed(self): + if self.start_time is None: + raise ValueError("Timer has not been started") + if self.end_time is None: + return time.perf_counter() - self.start_time + return self.end_time - self.start_time + + def print_result(self): + elapsed = self.elapsed() + print(f"{self.name}: ⏱ {elapsed:.6f} seconds") + +def align_with_downscaling(img1, img2, downscale_factor=4, try_common_rotations=True): + """ + Aligns images using a multi-scale approach with initial downscaling + + Args: + img1: Reference image (numpy array) + img2: Image to align (numpy array) + downscale_factor: How much to reduce size for initial alignment (e.g., 4 = 1/4 size) + try_common_rotations: Whether to test common rotations first + + Returns: + aligned_img: Aligned version of img2 + transform_matrix: Final transformation matrix + rotation_angle: Detected simple rotation (None if not found) + """ + # 1. First alignment at low resolution + with Timer("1st alignment at Low Res-Downsaling"): + small1 = downscale_image(img1, downscale_factor) + small2 = downscale_image(img2, downscale_factor) + print("Done downscaling...") + print("Please wait...Rotation starting.") + # Get initial alignment at low resolution + with Timer("2nd alignment at Low Res-Rotations"): + _, init_matrix, rotation_angle = align_with_ecc_and_rotation( + small1, small2, try_common_rotations + ) + print("Done rotating low res image...") + if init_matrix is None: + return img2, None, None # Alignment failed + + # 2. Refine alignment at full resolution with initial estimate + # Apply the rotation if one was detected + if rotation_angle is not None: + img2 = rotate_image(img2, rotation_angle) + + with Timer("Scaling translation components"): + # Scale up the transformation matrix + full_matrix = init_matrix.copy() + full_matrix[:2, 2] *= downscale_factor # Scale translation components + print("Done scale-up/transform...") + with Timer("Convert images to grayscale"): + # Convert images to grayscale for final alignment + gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) + gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) + print("Done greyscale alignment...") + # Set criteria for final alignment + criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 500, 1e-6) + print("Please wait...ECC initial estimate.") + try: + with Timer("ECC init"): + # Run ECC with initial estimate + cc, full_matrix = cv2.findTransformECC( + gray1, gray2, full_matrix, cv2.MOTION_AFFINE, criteria + ) + + with Timer("Apply final transformation to color image"): + # Apply final transformation to color image + aligned_img = cv2.warpAffine( + img2, full_matrix, (img1.shape[1], img1.shape[0]), + flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP + ) + + return aligned_img, full_matrix, rotation_angle + except: + return img2, None, None + +def downscale_image(img, factor): + """Downscale image by specified factor while preserving aspect ratio""" + if factor <= 1: + return img.copy() + + height, width = img.shape[:2] + new_size = (int(width/factor), int(height/factor)) + + # Use area interpolation for downscaling (best for reduction) + return cv2.resize(img, new_size, interpolation=cv2.INTER_AREA) + +def rotate_image(image, angle): + """Rotate image by specified angle (0, 90, 180, or 270 degrees)""" + if angle == 90: + return cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) + elif angle == 180: + return cv2.rotate(image, cv2.ROTATE_180) + elif angle == 270: + return cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE) + return image + +def try_ecc_alignment(target, moving): + """Try ECC alignment and return aligned image, matrix, and correlation coefficient""" + # Initialize warp matrix + warp_matrix = np.eye(2, 3, dtype=np.float32) + + # Set criteria + criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 1000, 1e-6) + + try: + # Run ECC + cc, warp_matrix = cv2.findTransformECC( + target, moving, warp_matrix, cv2.MOTION_AFFINE, criteria + ) + + # Apply the transformation + aligned = cv2.warpAffine( + moving, warp_matrix, (target.shape[1], target.shape[0]), + flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP + ) + + return aligned, warp_matrix, cc + except: + return moving, None, 0 + +def apply_transform(image, matrix, target_shape): + """Apply transformation matrix to color image""" + if matrix is None: + return image + + if matrix.shape == (2, 3): # Affine + return cv2.warpAffine( + image, matrix, (target_shape[1], target_shape[0]), + flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP + ) + elif matrix.shape == (3, 3): # Homography + return cv2.warpPerspective( + image, matrix, (target_shape[1], target_shape[0]), + flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP + ) + return image + +def align_ecc(img1, img2): + # Convert to grayscale + gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) + gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) + + # Define motion model (affine or homography) + warp_mode = cv2.MOTION_AFFINE # or cv2.MOTION_HOMOGRAPHY + + if warp_mode == cv2.MOTION_HOMOGRAPHY: + warp_matrix = np.eye(3, 3, dtype=np.float32) + else: + warp_matrix = np.eye(2, 3, dtype=np.float32) + + # Specify termination criteria + criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 1000, 1e-6) + + # Run ECC + try: + cc, warp_matrix = cv2.findTransformECC( + gray1, gray2, warp_matrix, warp_mode, criteria + ) + + if warp_mode == cv2.MOTION_HOMOGRAPHY: + aligned_img = cv2.warpPerspective( + img2, warp_matrix, (img1.shape[1], img1.shape[0]) + ) + else: + aligned_img = cv2.warpAffine( + img2, warp_matrix, (img1.shape[1], img1.shape[0]) + ) + + return aligned_img, warp_matrix + except: + print("Alignment failed") + return img2, None + +def align_with_ecc_and_rotation(img1, img2, try_common_rotations=True): + """ + Aligns img2 to img1 using ECC, with optional pre-testing of common rotations + + Args: + img1: Reference image (numpy array) + img2: Image to align (numpy array) + try_common_rotations: If True, tests common rotations first + + Returns: + aligned_img: Aligned version of img2 + transform_matrix: Transformation matrix used + rotation_angle: Detected rotation angle (None if not a simple rotation) + """ + # Convert to grayscale for alignment + gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) + gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) + + if try_common_rotations: + # Test common rotations first + best_cc = -1 + best_aligned = None + best_matrix = None + best_angle = None + + for angle in [0, 90, 180, 270]: + # Rotate the image + rotated = rotate_image(gray2, angle) + + # Try ECC alignment + aligned, matrix, cc = try_ecc_alignment(gray1, rotated) + + if cc > best_cc: + best_cc = cc + best_aligned = aligned + best_matrix = matrix + best_angle = angle if angle != 0 else None + + if best_cc > 0.3: # Good enough alignment found + # Apply the same transformation to color image + if best_angle is not None: + rotated_color = rotate_image(img2, best_angle) + else: + rotated_color = img2 + + if best_matrix is not None: + aligned_color = apply_transform(rotated_color, best_matrix, img1.shape) + else: + aligned_color = rotated_color + + return aligned_color, best_matrix, best_angle + + # If no good rotation found or try_common_rotations=False, do regular ECC + aligned_img, transform_matrix = align_ecc(img1, img2) + return aligned_img, transform_matrix, None + +def matrix_similarity_score(matrix): + """ + Calculate similarity score based on deviation from identity matrix. + Returns 1 for perfect match (identity), decreasing towards 0 for large transformations. + """ + if matrix is None: + return 0.0 # Alignment failed + + # For affine matrix (2x3) + if matrix.shape == (2, 3): + ideal = np.eye(2, 3, dtype=np.float32) + # Normalize translation components by image dimensions (assuming 1000px as reference) + normalized_matrix = matrix.copy() + normalized_matrix[:, 2] /= 1000.0 + # For homography matrix (3x3) + elif matrix.shape == (3, 3): + ideal = np.eye(3, dtype=np.float32) + normalized_matrix = matrix.copy() + normalized_matrix[:, 2] /= 1000.0 # Normalize translation + else: + return 0.0 + + # Calculate Frobenius norm of difference + diff = np.linalg.norm(normalized_matrix - ideal) + + # Convert to similarity score (0-1) + score = np.exp(-diff) # Exponential decay + return float(np.clip(score, 0, 1)) + +def decomposed_similarity_score(matrix, img_width): + """ + Calculate score by analyzing translation, rotation, and scaling separately. + img_width is used to normalize translation to image dimensions. + """ + if matrix is None: + return 0.0 + + # Decompose affine matrix + if matrix.shape == (2, 3): + # Extract rotation and scale + a, b, c, d = matrix[0,0], matrix[0,1], matrix[1,0], matrix[1,1] + scale_x = np.sqrt(a*a + b*b) + scale_y = np.sqrt(c*c + d*d) + rotation = np.arctan2(-b, a) + + # Extract translation (normalized by image width) + tx = matrix[0,2] / img_width + ty = matrix[1,2] / img_width + else: + return 0.0 + + # Calculate penalties (adjust weights as needed) + translation_penalty = np.sqrt(tx*tx + ty*ty) * 0.5 # Weight translation more + scale_penalty = np.abs(scale_x - 1) + np.abs(scale_y - 1) + rotation_penalty = np.abs(rotation) / np.pi # Normalized to 0-1 + + # Combine penalties + total_penalty = translation_penalty + scale_penalty + rotation_penalty + + # Convert to similarity score + return max(0, 1 - total_penalty) + +def comprehensive_similarity(img1, img2, matrix): + """Combine matrix analysis with image comparison""" + # 1. Matrix-based score (50% weight) + matrix_score = matrix_similarity_score(matrix) + + # 2. Pixel-based score after alignment (50% weight) + if matrix is not None: + aligned = cv2.warpAffine(img2, matrix, (img1.shape[1], img1.shape[0])) + pixel_score = normalized_cross_correlation(img1, aligned) + else: + pixel_score = 0.0 + + return 0.5 * matrix_score + 0.5 * pixel_score + +def normalized_cross_correlation(img1, img2): + """Calculate NCC between two images""" + # Convert to grayscale + gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(np.float32) + gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(np.float32) + + # Normalize + gray1 = (gray1 - np.mean(gray1)) / (np.std(gray1) + 1e-8) + gray2 = (gray2 - np.mean(gray2)) / (np.std(gray2) + 1e-8) + + # Calculate correlation + return np.mean(gray1 * gray2) + +def find_duplicate_with_rotation(img1, img2): + # Initialize ORB detector + orb = cv2.ORB_create() + + # Find keypoints and descriptors + kp1, des1 = orb.detectAndCompute(img1, None) + kp2, des2 = orb.detectAndCompute(img2, None) + + # Create BFMatcher object + bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) + + # Match descriptors + matches = bf.match(des1, des2) + + # Sort matches by distance + matches = sorted(matches, key=lambda x: x.distance) + + # Return similarity score (lower is more similar) + return len(matches) + +def get_image_dimensions_cv(img): + if img is not None: + height, width = img.shape[:2] + return width, height + return None, None + +""" +xxhash is about 5–10x faster than SHA256, non-cryptographic. +If you want an even lighter setup (no installs), we can use zlib.crc32 instead — +but xxhash is better if you care about collisions! +""" +def quick_file_hash(file_path): + hasher = xxhash.xxh64() # 64-bit very fast hash + try: + with open(file_path, 'rb') as f: + while chunk := f.read(8192): # Read in 8KB chunks + hasher.update(chunk) + except Exception as e: + print(f"Error hashing file: {e}") + return hasher.hexdigest()