|
|
|
|
@ -0,0 +1,431 @@ |
|
|
|
|
import sys |
|
|
|
|
import xxhash |
|
|
|
|
import cv2 |
|
|
|
|
import numpy as np |
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
Copyright 2025 - Robert Strutts MIT License |
|
|
|
|
|
|
|
|
|
Key Optimizations: |
|
|
|
|
|
|
|
|
|
Multi-Scale Processing: |
|
|
|
|
First alignment at low resolution (faster) |
|
|
|
|
Final refinement at full resolution (accurate) |
|
|
|
|
|
|
|
|
|
Matrix Scaling: |
|
|
|
|
The translation components of the transformation matrix are scaled up |
|
|
|
|
Rotation and scaling components remain the same |
|
|
|
|
|
|
|
|
|
Smart Downscaling: |
|
|
|
|
Uses INTER_AREA interpolation which is ideal for size reduction |
|
|
|
|
Maintains aspect ratio |
|
|
|
|
|
|
|
|
|
Performance Benefits: |
|
|
|
|
Processing time scales with area, so 4x downscale = ~16x faster initial alignment |
|
|
|
|
Memory usage significantly reduced |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
def align_with_downscaling(img1, img2, downscale_factor=4, try_common_rotations=True): |
|
|
|
|
""" |
|
|
|
|
Aligns images using a multi-scale approach with initial downscaling |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
img1: Reference image (numpy array) |
|
|
|
|
img2: Image to align (numpy array) |
|
|
|
|
downscale_factor: How much to reduce size for initial alignment (e.g., 4 = 1/4 size) |
|
|
|
|
try_common_rotations: Whether to test common rotations first |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
aligned_img: Aligned version of img2 |
|
|
|
|
transform_matrix: Final transformation matrix |
|
|
|
|
rotation_angle: Detected simple rotation (None if not found) |
|
|
|
|
""" |
|
|
|
|
# 1. First alignment at low resolution |
|
|
|
|
small1 = downscale_image(img1, downscale_factor) |
|
|
|
|
small2 = downscale_image(img2, downscale_factor) |
|
|
|
|
|
|
|
|
|
# Get initial alignment at low resolution |
|
|
|
|
_, init_matrix, rotation_angle = align_with_ecc_and_rotation( |
|
|
|
|
small1, small2, try_common_rotations |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if init_matrix is None: |
|
|
|
|
return img2, None, None # Alignment failed |
|
|
|
|
|
|
|
|
|
# 2. Refine alignment at full resolution with initial estimate |
|
|
|
|
# Apply the rotation if one was detected |
|
|
|
|
if rotation_angle is not None: |
|
|
|
|
img2 = rotate_image(img2, rotation_angle) |
|
|
|
|
|
|
|
|
|
# Scale up the transformation matrix |
|
|
|
|
full_matrix = init_matrix.copy() |
|
|
|
|
full_matrix[:2, 2] *= downscale_factor # Scale translation components |
|
|
|
|
|
|
|
|
|
# Convert images to grayscale for final alignment |
|
|
|
|
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) |
|
|
|
|
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
|
# Set criteria for final alignment |
|
|
|
|
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 500, 1e-6) |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
# Run ECC with initial estimate |
|
|
|
|
cc, full_matrix = cv2.findTransformECC( |
|
|
|
|
gray1, gray2, full_matrix, cv2.MOTION_AFFINE, criteria |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Apply final transformation to color image |
|
|
|
|
aligned_img = cv2.warpAffine( |
|
|
|
|
img2, full_matrix, (img1.shape[1], img1.shape[0]), |
|
|
|
|
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return aligned_img, full_matrix, rotation_angle |
|
|
|
|
except: |
|
|
|
|
return img2, None, None |
|
|
|
|
|
|
|
|
|
def downscale_image(img, factor): |
|
|
|
|
"""Downscale image by specified factor while preserving aspect ratio""" |
|
|
|
|
if factor <= 1: |
|
|
|
|
return img.copy() |
|
|
|
|
|
|
|
|
|
height, width = img.shape[:2] |
|
|
|
|
new_size = (int(width/factor), int(height/factor)) |
|
|
|
|
|
|
|
|
|
# Use area interpolation for downscaling (best for reduction) |
|
|
|
|
return cv2.resize(img, new_size, interpolation=cv2.INTER_AREA) |
|
|
|
|
|
|
|
|
|
def rotate_image(image, angle): |
|
|
|
|
"""Rotate image by specified angle (0, 90, 180, or 270 degrees)""" |
|
|
|
|
if angle == 90: |
|
|
|
|
return cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) |
|
|
|
|
elif angle == 180: |
|
|
|
|
return cv2.rotate(image, cv2.ROTATE_180) |
|
|
|
|
elif angle == 270: |
|
|
|
|
return cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE) |
|
|
|
|
return image |
|
|
|
|
|
|
|
|
|
def try_ecc_alignment(target, moving): |
|
|
|
|
"""Try ECC alignment and return aligned image, matrix, and correlation coefficient""" |
|
|
|
|
# Initialize warp matrix |
|
|
|
|
warp_matrix = np.eye(2, 3, dtype=np.float32) |
|
|
|
|
|
|
|
|
|
# Set criteria |
|
|
|
|
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 1000, 1e-6) |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
# Run ECC |
|
|
|
|
cc, warp_matrix = cv2.findTransformECC( |
|
|
|
|
target, moving, warp_matrix, cv2.MOTION_AFFINE, criteria |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Apply the transformation |
|
|
|
|
aligned = cv2.warpAffine( |
|
|
|
|
moving, warp_matrix, (target.shape[1], target.shape[0]), |
|
|
|
|
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return aligned, warp_matrix, cc |
|
|
|
|
except: |
|
|
|
|
return moving, None, 0 |
|
|
|
|
|
|
|
|
|
def apply_transform(image, matrix, target_shape): |
|
|
|
|
"""Apply transformation matrix to color image""" |
|
|
|
|
if matrix is None: |
|
|
|
|
return image |
|
|
|
|
|
|
|
|
|
if matrix.shape == (2, 3): # Affine |
|
|
|
|
return cv2.warpAffine( |
|
|
|
|
image, matrix, (target_shape[1], target_shape[0]), |
|
|
|
|
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP |
|
|
|
|
) |
|
|
|
|
elif matrix.shape == (3, 3): # Homography |
|
|
|
|
return cv2.warpPerspective( |
|
|
|
|
image, matrix, (target_shape[1], target_shape[0]), |
|
|
|
|
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP |
|
|
|
|
) |
|
|
|
|
return image |
|
|
|
|
|
|
|
|
|
def align_ecc(img1, img2): |
|
|
|
|
# Convert to grayscale |
|
|
|
|
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) |
|
|
|
|
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
|
# Define motion model (affine or homography) |
|
|
|
|
warp_mode = cv2.MOTION_AFFINE # or cv2.MOTION_HOMOGRAPHY |
|
|
|
|
|
|
|
|
|
if warp_mode == cv2.MOTION_HOMOGRAPHY: |
|
|
|
|
warp_matrix = np.eye(3, 3, dtype=np.float32) |
|
|
|
|
else: |
|
|
|
|
warp_matrix = np.eye(2, 3, dtype=np.float32) |
|
|
|
|
|
|
|
|
|
# Specify termination criteria |
|
|
|
|
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 1000, 1e-6) |
|
|
|
|
|
|
|
|
|
# Run ECC |
|
|
|
|
try: |
|
|
|
|
cc, warp_matrix = cv2.findTransformECC( |
|
|
|
|
gray1, gray2, warp_matrix, warp_mode, criteria |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if warp_mode == cv2.MOTION_HOMOGRAPHY: |
|
|
|
|
aligned_img = cv2.warpPerspective( |
|
|
|
|
img2, warp_matrix, (img1.shape[1], img1.shape[0]) |
|
|
|
|
) |
|
|
|
|
else: |
|
|
|
|
aligned_img = cv2.warpAffine( |
|
|
|
|
img2, warp_matrix, (img1.shape[1], img1.shape[0]) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return aligned_img, warp_matrix |
|
|
|
|
except: |
|
|
|
|
print("Alignment failed") |
|
|
|
|
return img2, None |
|
|
|
|
|
|
|
|
|
def align_with_ecc_and_rotation(img1, img2, try_common_rotations=True): |
|
|
|
|
""" |
|
|
|
|
Aligns img2 to img1 using ECC, with optional pre-testing of common rotations |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
img1: Reference image (numpy array) |
|
|
|
|
img2: Image to align (numpy array) |
|
|
|
|
try_common_rotations: If True, tests common rotations first |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
aligned_img: Aligned version of img2 |
|
|
|
|
transform_matrix: Transformation matrix used |
|
|
|
|
rotation_angle: Detected rotation angle (None if not a simple rotation) |
|
|
|
|
""" |
|
|
|
|
# Convert to grayscale for alignment |
|
|
|
|
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) |
|
|
|
|
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
|
if try_common_rotations: |
|
|
|
|
# Test common rotations first |
|
|
|
|
best_cc = -1 |
|
|
|
|
best_aligned = None |
|
|
|
|
best_matrix = None |
|
|
|
|
best_angle = None |
|
|
|
|
|
|
|
|
|
for angle in [0, 90, 180, 270]: |
|
|
|
|
# Rotate the image |
|
|
|
|
rotated = rotate_image(gray2, angle) |
|
|
|
|
|
|
|
|
|
# Try ECC alignment |
|
|
|
|
aligned, matrix, cc = try_ecc_alignment(gray1, rotated) |
|
|
|
|
|
|
|
|
|
if cc > best_cc: |
|
|
|
|
best_cc = cc |
|
|
|
|
best_aligned = aligned |
|
|
|
|
best_matrix = matrix |
|
|
|
|
best_angle = angle if angle != 0 else None |
|
|
|
|
|
|
|
|
|
if best_cc > 0.3: # Good enough alignment found |
|
|
|
|
# Apply the same transformation to color image |
|
|
|
|
if best_angle is not None: |
|
|
|
|
rotated_color = rotate_image(img2, best_angle) |
|
|
|
|
else: |
|
|
|
|
rotated_color = img2 |
|
|
|
|
|
|
|
|
|
if best_matrix is not None: |
|
|
|
|
aligned_color = apply_transform(rotated_color, best_matrix, img1.shape) |
|
|
|
|
else: |
|
|
|
|
aligned_color = rotated_color |
|
|
|
|
|
|
|
|
|
return aligned_color, best_matrix, best_angle |
|
|
|
|
|
|
|
|
|
# If no good rotation found or try_common_rotations=False, do regular ECC |
|
|
|
|
aligned_img, transform_matrix = align_ecc(img1, img2) |
|
|
|
|
return aligned_img, transform_matrix, None |
|
|
|
|
|
|
|
|
|
def matrix_similarity_score(matrix): |
|
|
|
|
""" |
|
|
|
|
Calculate similarity score based on deviation from identity matrix. |
|
|
|
|
Returns 1 for perfect match (identity), decreasing towards 0 for large transformations. |
|
|
|
|
""" |
|
|
|
|
if matrix is None: |
|
|
|
|
return 0.0 # Alignment failed |
|
|
|
|
|
|
|
|
|
# For affine matrix (2x3) |
|
|
|
|
if matrix.shape == (2, 3): |
|
|
|
|
ideal = np.eye(2, 3, dtype=np.float32) |
|
|
|
|
# Normalize translation components by image dimensions (assuming 1000px as reference) |
|
|
|
|
normalized_matrix = matrix.copy() |
|
|
|
|
normalized_matrix[:, 2] /= 1000.0 |
|
|
|
|
# For homography matrix (3x3) |
|
|
|
|
elif matrix.shape == (3, 3): |
|
|
|
|
ideal = np.eye(3, dtype=np.float32) |
|
|
|
|
normalized_matrix = matrix.copy() |
|
|
|
|
normalized_matrix[:, 2] /= 1000.0 # Normalize translation |
|
|
|
|
else: |
|
|
|
|
return 0.0 |
|
|
|
|
|
|
|
|
|
# Calculate Frobenius norm of difference |
|
|
|
|
diff = np.linalg.norm(normalized_matrix - ideal) |
|
|
|
|
|
|
|
|
|
# Convert to similarity score (0-1) |
|
|
|
|
score = np.exp(-diff) # Exponential decay |
|
|
|
|
return float(np.clip(score, 0, 1)) |
|
|
|
|
|
|
|
|
|
def decomposed_similarity_score(matrix, img_width): |
|
|
|
|
""" |
|
|
|
|
Calculate score by analyzing translation, rotation, and scaling separately. |
|
|
|
|
img_width is used to normalize translation to image dimensions. |
|
|
|
|
""" |
|
|
|
|
if matrix is None: |
|
|
|
|
return 0.0 |
|
|
|
|
|
|
|
|
|
# Decompose affine matrix |
|
|
|
|
if matrix.shape == (2, 3): |
|
|
|
|
# Extract rotation and scale |
|
|
|
|
a, b, c, d = matrix[0,0], matrix[0,1], matrix[1,0], matrix[1,1] |
|
|
|
|
scale_x = np.sqrt(a*a + b*b) |
|
|
|
|
scale_y = np.sqrt(c*c + d*d) |
|
|
|
|
rotation = np.arctan2(-b, a) |
|
|
|
|
|
|
|
|
|
# Extract translation (normalized by image width) |
|
|
|
|
tx = matrix[0,2] / img_width |
|
|
|
|
ty = matrix[1,2] / img_width |
|
|
|
|
else: |
|
|
|
|
return 0.0 |
|
|
|
|
|
|
|
|
|
# Calculate penalties (adjust weights as needed) |
|
|
|
|
translation_penalty = np.sqrt(tx*tx + ty*ty) * 0.5 # Weight translation more |
|
|
|
|
scale_penalty = np.abs(scale_x - 1) + np.abs(scale_y - 1) |
|
|
|
|
rotation_penalty = np.abs(rotation) / np.pi # Normalized to 0-1 |
|
|
|
|
|
|
|
|
|
# Combine penalties |
|
|
|
|
total_penalty = translation_penalty + scale_penalty + rotation_penalty |
|
|
|
|
|
|
|
|
|
# Convert to similarity score |
|
|
|
|
return max(0, 1 - total_penalty) |
|
|
|
|
|
|
|
|
|
def comprehensive_similarity(img1, img2, matrix): |
|
|
|
|
"""Combine matrix analysis with image comparison""" |
|
|
|
|
# 1. Matrix-based score (50% weight) |
|
|
|
|
matrix_score = matrix_similarity_score(matrix) |
|
|
|
|
|
|
|
|
|
# 2. Pixel-based score after alignment (50% weight) |
|
|
|
|
if matrix is not None: |
|
|
|
|
aligned = cv2.warpAffine(img2, matrix, (img1.shape[1], img1.shape[0])) |
|
|
|
|
pixel_score = normalized_cross_correlation(img1, aligned) |
|
|
|
|
else: |
|
|
|
|
pixel_score = 0.0 |
|
|
|
|
|
|
|
|
|
return 0.5 * matrix_score + 0.5 * pixel_score |
|
|
|
|
|
|
|
|
|
def normalized_cross_correlation(img1, img2): |
|
|
|
|
"""Calculate NCC between two images""" |
|
|
|
|
# Convert to grayscale |
|
|
|
|
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(np.float32) |
|
|
|
|
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(np.float32) |
|
|
|
|
|
|
|
|
|
# Normalize |
|
|
|
|
gray1 = (gray1 - np.mean(gray1)) / (np.std(gray1) + 1e-8) |
|
|
|
|
gray2 = (gray2 - np.mean(gray2)) / (np.std(gray2) + 1e-8) |
|
|
|
|
|
|
|
|
|
# Calculate correlation |
|
|
|
|
return np.mean(gray1 * gray2) |
|
|
|
|
|
|
|
|
|
def find_duplicate_with_rotation(img1, img2): |
|
|
|
|
# Initialize ORB detector |
|
|
|
|
orb = cv2.ORB_create() |
|
|
|
|
|
|
|
|
|
# Find keypoints and descriptors |
|
|
|
|
kp1, des1 = orb.detectAndCompute(img1, None) |
|
|
|
|
kp2, des2 = orb.detectAndCompute(img2, None) |
|
|
|
|
|
|
|
|
|
# Create BFMatcher object |
|
|
|
|
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) |
|
|
|
|
|
|
|
|
|
# Match descriptors |
|
|
|
|
matches = bf.match(des1, des2) |
|
|
|
|
|
|
|
|
|
# Sort matches by distance |
|
|
|
|
matches = sorted(matches, key=lambda x: x.distance) |
|
|
|
|
|
|
|
|
|
# Return similarity score (lower is more similar) |
|
|
|
|
return len(matches) |
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
xxhash is about 5–10x faster than SHA256, non-cryptographic. |
|
|
|
|
If you want an even lighter setup (no installs), we can use zlib.crc32 instead — |
|
|
|
|
but xxhash is better if you care about collisions! |
|
|
|
|
""" |
|
|
|
|
def quick_file_hash(file_path): |
|
|
|
|
hasher = xxhash.xxh64() # 64-bit very fast hash |
|
|
|
|
try: |
|
|
|
|
with open(file_path, 'rb') as f: |
|
|
|
|
while chunk := f.read(8192): # Read in 8KB chunks |
|
|
|
|
hasher.update(chunk) |
|
|
|
|
except Exception as e: |
|
|
|
|
print(f"Error hashing file: {e}") |
|
|
|
|
return hasher.hexdigest() |
|
|
|
|
|
|
|
|
|
# --- Example usage --- |
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
if len(sys.argv) < 3: |
|
|
|
|
print("Usage: python3 dedup.py file1.jpg file2.jpg") |
|
|
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
|
file1 = sys.argv[1] |
|
|
|
|
file2 = sys.argv[2] |
|
|
|
|
# Quick hashes |
|
|
|
|
hash1 = quick_file_hash(file1) |
|
|
|
|
hash2 = quick_file_hash(file2) |
|
|
|
|
|
|
|
|
|
if (hash1 == hash2): |
|
|
|
|
print("xxHash found duplicates") |
|
|
|
|
print("✅ Perfect match - images are identical") |
|
|
|
|
print("No transformation needed") |
|
|
|
|
exit(1) |
|
|
|
|
|
|
|
|
|
# Load large images |
|
|
|
|
large_img1 = cv2.imread(file1) # e.g., 4000x3000 pixels |
|
|
|
|
large_img2 = cv2.imread(file2) # e.g., 4000x3000 pixels |
|
|
|
|
|
|
|
|
|
# Align with downscaling (initially process at 1/4 size) |
|
|
|
|
aligned, matrix, angle = align_with_downscaling( |
|
|
|
|
large_img1, large_img2, |
|
|
|
|
downscale_factor=4, |
|
|
|
|
try_common_rotations=True |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Save result |
|
|
|
|
# cv2.imwrite('aligned_large.jpg', aligned) |
|
|
|
|
|
|
|
|
|
# Print debug info |
|
|
|
|
print(f"Detected rotation: {angle}°") |
|
|
|
|
print(f"Final transformation matrix:\n{matrix}") |
|
|
|
|
|
|
|
|
|
score = find_duplicate_with_rotation(large_img1, aligned) |
|
|
|
|
print(f"Score: {score}") |
|
|
|
|
|
|
|
|
|
# Calculate scores |
|
|
|
|
matrix_score = matrix_similarity_score(matrix) |
|
|
|
|
decomposed_score = decomposed_similarity_score(matrix, large_img1.shape[1]) |
|
|
|
|
combined_score = comprehensive_similarity(large_img1, aligned, matrix) |
|
|
|
|
|
|
|
|
|
# Check for perfect alignment |
|
|
|
|
if matrix_score == 1.0 and decomposed_score == 1.0 and combined_score == 1.0: |
|
|
|
|
print("✅ Perfect match - images are identical") |
|
|
|
|
print("No transformation needed") |
|
|
|
|
exit_code = 1 |
|
|
|
|
elif matrix_score > 0.9 and decomposed_score > 0.9 and combined_score > 0.7: |
|
|
|
|
print("⚠️ Near-perfect alignment - minor differences detected") |
|
|
|
|
exit_code = 2 |
|
|
|
|
else: |
|
|
|
|
print("❌ Significant transformation required") |
|
|
|
|
exit_code = 0 |
|
|
|
|
|
|
|
|
|
print(f"Matrix deviation score: {matrix_score:.4f}") |
|
|
|
|
print(f"Decomposed similarity: {decomposed_score:.4f}") |
|
|
|
|
print(f"Combined similarity: {combined_score:.4f}") |
|
|
|
|
exit(exit_code) |
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
Matrix-based scores are fast but don't consider image content |
|
|
|
|
Decomposed analysis gives more interpretable results (separate rotation/scale/translation) |
|
|
|
|
Combined approaches with pixel comparison are most accurate but slower |
|
|
|
|
Normalization is crucial - translation should be relative to image size |
|
|
|
|
""" |