You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
463 lines
15 KiB
463 lines
15 KiB
import signal
|
|
import threading
|
|
import os
|
|
import sys
|
|
import xxhash
|
|
import cv2
|
|
import numpy as np
|
|
import time
|
|
|
|
"""
|
|
Copyright (c) 2025 by Robert Strutts
|
|
License: MIT
|
|
"""
|
|
|
|
use_ANSI_Colors = True
|
|
|
|
# ANSI escape codes for colors
|
|
WHITE = "\033[97m"
|
|
BRIGHT_GREEN = "\033[92m"
|
|
BRIGHT_YELLOW = "\033[93m"
|
|
BRIGHT_RED = "\033[91m"
|
|
RESET = "\033[0m"
|
|
|
|
start = time.perf_counter()
|
|
|
|
def enable_ansi():
|
|
global use_ANSI_Colors
|
|
use_ANSI_Colors = True
|
|
def disable_ansi():
|
|
global use_ANSI_Colors
|
|
use_ANSI_Colors = False
|
|
|
|
def kill_all():
|
|
print("KILLING PROCESS")
|
|
os.kill(os.getpid(), signal.SIGKILL) # Force kernel-level termination
|
|
|
|
def exit_handler(signum, frame):
|
|
threading.Thread(target=kill_all).start() # Run in separate thread
|
|
# CTRL+C will Exit NOW!!!
|
|
signal.signal(signal.SIGINT, exit_handler)
|
|
|
|
def get_color_for_timer(total):
|
|
match total:
|
|
case x if x < 1: # 0.x
|
|
return BRIGHT_GREEN
|
|
case x if 1 <= x <= 7: # Matches 1 to 7
|
|
return WHITE
|
|
case x if x >= 10:
|
|
return BRIGHT_RED
|
|
case _:
|
|
return BRIGHT_YELLOW #8-9
|
|
|
|
def exit_timer(level):
|
|
end = time.perf_counter()
|
|
total_time = end - start
|
|
if use_ANSI_Colors == True:
|
|
use_color = get_color_for_timer(total_time)
|
|
print(f"{use_color}⏱ Execution took {total_time:.4f} seconds{RESET}")
|
|
else:
|
|
print(f"⏱ Execution took {total_time:.4f} seconds")
|
|
|
|
exit(level)
|
|
|
|
class Timer:
|
|
def __init__(self, name=None):
|
|
self.name = name if name else "Timer"
|
|
self.start_time = None
|
|
self.end_time = None
|
|
|
|
def __enter__(self):
|
|
self.start()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.stop()
|
|
self.print_result()
|
|
|
|
def start(self):
|
|
self.start_time = time.perf_counter()
|
|
|
|
def stop(self):
|
|
self.end_time = time.perf_counter()
|
|
|
|
def elapsed(self):
|
|
if self.start_time is None:
|
|
raise ValueError("Timer has not been started")
|
|
if self.end_time is None:
|
|
return time.perf_counter() - self.start_time
|
|
return self.end_time - self.start_time
|
|
|
|
def print_result(self):
|
|
elapsed = self.elapsed()
|
|
if use_ANSI_Colors == True:
|
|
use_color = get_color_for_timer(elapsed)
|
|
print(f"{self.name}: {use_color}⏱ {elapsed:.6f} seconds{RESET}")
|
|
else:
|
|
print(f"{self.name}: ⏱ {elapsed:.6f} seconds")
|
|
|
|
def align_with_downscaling(img1, img2, downscale_factor=4, try_common_rotations=True):
|
|
"""
|
|
Aligns images using a multi-scale approach with initial downscaling
|
|
|
|
Args:
|
|
img1: Reference image (numpy array)
|
|
img2: Image to align (numpy array)
|
|
downscale_factor: How much to reduce size for initial alignment (e.g., 4 = 1/4 size)
|
|
try_common_rotations: Whether to test common rotations first
|
|
|
|
Returns:
|
|
aligned_img: Aligned version of img2
|
|
transform_matrix: Final transformation matrix
|
|
rotation_angle: Detected simple rotation (None if not found)
|
|
"""
|
|
# 1. First alignment at low resolution
|
|
with Timer("1st alignment at Low Res-Downsaling"):
|
|
small1 = downscale_image(img1, downscale_factor)
|
|
small2 = downscale_image(img2, downscale_factor)
|
|
print("Done downscaling...")
|
|
print("Please wait...Rotation starting.")
|
|
# Get initial alignment at low resolution
|
|
with Timer("2nd alignment at Low Res-Rotations"):
|
|
_, init_matrix, rotation_angle = align_with_ecc_and_rotation(
|
|
small1, small2, try_common_rotations
|
|
)
|
|
print("Done rotating low res image...")
|
|
if init_matrix is None:
|
|
return img2, None, None # Alignment failed
|
|
|
|
# 2. Refine alignment at full resolution with initial estimate
|
|
# Apply the rotation if one was detected
|
|
if rotation_angle is not None:
|
|
img2 = rotate_image(img2, rotation_angle)
|
|
|
|
with Timer("Scaling translation components"):
|
|
# Scale up the transformation matrix
|
|
full_matrix = init_matrix.copy()
|
|
full_matrix[:2, 2] *= downscale_factor # Scale translation components
|
|
print("Done scale-up/transform...")
|
|
with Timer("Convert images to grayscale"):
|
|
# Convert images to grayscale for final alignment
|
|
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
|
|
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
|
|
print("Done greyscale alignment...")
|
|
# Set criteria for final alignment
|
|
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 500, 1e-6)
|
|
print("Please wait...ECC initial estimate.")
|
|
try:
|
|
with Timer("ECC init"):
|
|
# Run ECC with initial estimate
|
|
cc, full_matrix = cv2.findTransformECC(
|
|
gray1, gray2, full_matrix, cv2.MOTION_AFFINE, criteria
|
|
)
|
|
|
|
with Timer("Apply final transformation to color image"):
|
|
# Apply final transformation to color image
|
|
aligned_img = cv2.warpAffine(
|
|
img2, full_matrix, (img1.shape[1], img1.shape[0]),
|
|
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP
|
|
)
|
|
|
|
return aligned_img, full_matrix, rotation_angle
|
|
except:
|
|
return img2, None, None
|
|
|
|
def downscale_image(img, factor):
|
|
"""Downscale image by specified factor while preserving aspect ratio"""
|
|
if factor <= 1:
|
|
return img.copy()
|
|
|
|
height, width = img.shape[:2]
|
|
new_size = (int(width/factor), int(height/factor))
|
|
|
|
# Use area interpolation for downscaling (best for reduction)
|
|
return cv2.resize(img, new_size, interpolation=cv2.INTER_AREA)
|
|
|
|
def rotate_image(image, angle):
|
|
"""Rotate image by specified angle (0, 90, 180, or 270 degrees)"""
|
|
if angle == 90:
|
|
return cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
|
|
elif angle == 180:
|
|
return cv2.rotate(image, cv2.ROTATE_180)
|
|
elif angle == 270:
|
|
return cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE)
|
|
return image
|
|
|
|
def try_ecc_alignment(target, moving):
|
|
"""Try ECC alignment and return aligned image, matrix, and correlation coefficient"""
|
|
# Initialize warp matrix
|
|
warp_matrix = np.eye(2, 3, dtype=np.float32)
|
|
|
|
# Set criteria
|
|
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 1000, 1e-6)
|
|
|
|
try:
|
|
# Run ECC
|
|
cc, warp_matrix = cv2.findTransformECC(
|
|
target, moving, warp_matrix, cv2.MOTION_AFFINE, criteria
|
|
)
|
|
|
|
# Apply the transformation
|
|
aligned = cv2.warpAffine(
|
|
moving, warp_matrix, (target.shape[1], target.shape[0]),
|
|
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP
|
|
)
|
|
|
|
return aligned, warp_matrix, cc
|
|
except:
|
|
return moving, None, 0
|
|
|
|
def apply_transform(image, matrix, target_shape):
|
|
"""Apply transformation matrix to color image"""
|
|
if matrix is None:
|
|
return image
|
|
|
|
if matrix.shape == (2, 3): # Affine
|
|
return cv2.warpAffine(
|
|
image, matrix, (target_shape[1], target_shape[0]),
|
|
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP
|
|
)
|
|
elif matrix.shape == (3, 3): # Homography
|
|
return cv2.warpPerspective(
|
|
image, matrix, (target_shape[1], target_shape[0]),
|
|
flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP
|
|
)
|
|
return image
|
|
|
|
def align_ecc(img1, img2):
|
|
# Convert to grayscale
|
|
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
|
|
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Define motion model (affine or homography)
|
|
warp_mode = cv2.MOTION_AFFINE # or cv2.MOTION_HOMOGRAPHY
|
|
|
|
if warp_mode == cv2.MOTION_HOMOGRAPHY:
|
|
warp_matrix = np.eye(3, 3, dtype=np.float32)
|
|
else:
|
|
warp_matrix = np.eye(2, 3, dtype=np.float32)
|
|
|
|
# Specify termination criteria
|
|
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 1000, 1e-6)
|
|
|
|
# Run ECC
|
|
try:
|
|
cc, warp_matrix = cv2.findTransformECC(
|
|
gray1, gray2, warp_matrix, warp_mode, criteria
|
|
)
|
|
|
|
if warp_mode == cv2.MOTION_HOMOGRAPHY:
|
|
aligned_img = cv2.warpPerspective(
|
|
img2, warp_matrix, (img1.shape[1], img1.shape[0])
|
|
)
|
|
else:
|
|
aligned_img = cv2.warpAffine(
|
|
img2, warp_matrix, (img1.shape[1], img1.shape[0])
|
|
)
|
|
|
|
return aligned_img, warp_matrix
|
|
except:
|
|
print("Alignment failed")
|
|
return img2, None
|
|
|
|
def align_with_ecc_and_rotation(img1, img2, try_common_rotations=True):
|
|
"""
|
|
Aligns img2 to img1 using ECC, with optional pre-testing of common rotations
|
|
|
|
Args:
|
|
img1: Reference image (numpy array)
|
|
img2: Image to align (numpy array)
|
|
try_common_rotations: If True, tests common rotations first
|
|
|
|
Returns:
|
|
aligned_img: Aligned version of img2
|
|
transform_matrix: Transformation matrix used
|
|
rotation_angle: Detected rotation angle (None if not a simple rotation)
|
|
"""
|
|
# Convert to grayscale for alignment
|
|
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
|
|
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
|
|
|
|
if try_common_rotations:
|
|
# Test common rotations first
|
|
best_cc = -1
|
|
best_aligned = None
|
|
best_matrix = None
|
|
best_angle = None
|
|
|
|
for angle in [0, 90, 180, 270]:
|
|
# Rotate the image
|
|
rotated = rotate_image(gray2, angle)
|
|
|
|
# Try ECC alignment
|
|
aligned, matrix, cc = try_ecc_alignment(gray1, rotated)
|
|
|
|
if cc > best_cc:
|
|
best_cc = cc
|
|
best_aligned = aligned
|
|
best_matrix = matrix
|
|
best_angle = angle if angle != 0 else None
|
|
|
|
if best_cc > 0.3: # Good enough alignment found
|
|
# Apply the same transformation to color image
|
|
if best_angle is not None:
|
|
rotated_color = rotate_image(img2, best_angle)
|
|
else:
|
|
rotated_color = img2
|
|
|
|
if best_matrix is not None:
|
|
aligned_color = apply_transform(rotated_color, best_matrix, img1.shape)
|
|
else:
|
|
aligned_color = rotated_color
|
|
|
|
return aligned_color, best_matrix, best_angle
|
|
|
|
# If no good rotation found or try_common_rotations=False, do regular ECC
|
|
aligned_img, transform_matrix = align_ecc(img1, img2)
|
|
return aligned_img, transform_matrix, None
|
|
|
|
def matrix_similarity_score(matrix):
|
|
"""
|
|
Calculate similarity score based on deviation from identity matrix.
|
|
Returns 1 for perfect match (identity), decreasing towards 0 for large transformations.
|
|
"""
|
|
if matrix is None:
|
|
return 0.0 # Alignment failed
|
|
|
|
# For affine matrix (2x3)
|
|
if matrix.shape == (2, 3):
|
|
ideal = np.eye(2, 3, dtype=np.float32)
|
|
# Normalize translation components by image dimensions (assuming 1000px as reference)
|
|
normalized_matrix = matrix.copy()
|
|
normalized_matrix[:, 2] /= 1000.0
|
|
# For homography matrix (3x3)
|
|
elif matrix.shape == (3, 3):
|
|
ideal = np.eye(3, dtype=np.float32)
|
|
normalized_matrix = matrix.copy()
|
|
normalized_matrix[:, 2] /= 1000.0 # Normalize translation
|
|
else:
|
|
return 0.0
|
|
|
|
# Calculate Frobenius norm of difference
|
|
diff = np.linalg.norm(normalized_matrix - ideal)
|
|
|
|
# Convert to similarity score (0-1)
|
|
score = np.exp(-diff) # Exponential decay
|
|
return float(np.clip(score, 0, 1))
|
|
|
|
def decomposed_similarity_score(matrix, img_width):
|
|
"""
|
|
Calculate score by analyzing translation, rotation, and scaling separately.
|
|
img_width is used to normalize translation to image dimensions.
|
|
"""
|
|
if matrix is None:
|
|
return 0.0
|
|
|
|
# Decompose affine matrix
|
|
if matrix.shape == (2, 3):
|
|
# Extract rotation and scale
|
|
a, b, c, d = matrix[0,0], matrix[0,1], matrix[1,0], matrix[1,1]
|
|
scale_x = np.sqrt(a*a + b*b)
|
|
scale_y = np.sqrt(c*c + d*d)
|
|
rotation = np.arctan2(-b, a)
|
|
|
|
# Extract translation (normalized by image width)
|
|
tx = matrix[0,2] / img_width
|
|
ty = matrix[1,2] / img_width
|
|
else:
|
|
return 0.0
|
|
|
|
# Calculate penalties (adjust weights as needed)
|
|
translation_penalty = np.sqrt(tx*tx + ty*ty) * 0.5 # Weight translation more
|
|
scale_penalty = np.abs(scale_x - 1) + np.abs(scale_y - 1)
|
|
rotation_penalty = np.abs(rotation) / np.pi # Normalized to 0-1
|
|
|
|
# Combine penalties
|
|
total_penalty = translation_penalty + scale_penalty + rotation_penalty
|
|
|
|
# Convert to similarity score
|
|
return max(0, 1 - total_penalty)
|
|
|
|
def comprehensive_similarity(img1, img2, matrix):
|
|
"""Combine matrix analysis with image comparison"""
|
|
# 1. Matrix-based score (50% weight)
|
|
matrix_score = matrix_similarity_score(matrix)
|
|
|
|
# 2. Pixel-based score after alignment (50% weight)
|
|
if matrix is not None:
|
|
aligned = cv2.warpAffine(img2, matrix, (img1.shape[1], img1.shape[0]))
|
|
pixel_score = normalized_cross_correlation(img1, aligned)
|
|
else:
|
|
pixel_score = 0.0
|
|
|
|
return 0.5 * matrix_score + 0.5 * pixel_score
|
|
|
|
def normalized_cross_correlation(img1, img2):
|
|
"""Calculate NCC between two images"""
|
|
# Convert to grayscale
|
|
gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY).astype(np.float32)
|
|
gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY).astype(np.float32)
|
|
|
|
# Normalize
|
|
gray1 = (gray1 - np.mean(gray1)) / (np.std(gray1) + 1e-8)
|
|
gray2 = (gray2 - np.mean(gray2)) / (np.std(gray2) + 1e-8)
|
|
|
|
# Calculate correlation
|
|
return np.mean(gray1 * gray2)
|
|
|
|
def find_duplicate_with_rotation(img1, img2):
|
|
# Initialize ORB detector
|
|
orb = cv2.ORB_create()
|
|
|
|
# Find keypoints and descriptors
|
|
kp1, des1 = orb.detectAndCompute(img1, None)
|
|
kp2, des2 = orb.detectAndCompute(img2, None)
|
|
|
|
# Create BFMatcher object
|
|
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
|
|
|
|
# Match descriptors
|
|
matches = bf.match(des1, des2)
|
|
|
|
# Sort matches by distance
|
|
matches = sorted(matches, key=lambda x: x.distance)
|
|
|
|
# Return similarity score (lower is more similar)
|
|
return len(matches)
|
|
|
|
def get_image_dimensions_cv(img):
|
|
if img is not None:
|
|
height, width = img.shape[:2]
|
|
return width, height
|
|
return None, None
|
|
|
|
def check_file_size_bytes(file_path, too_small, too_large):
|
|
try:
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
if file_size < too_small:
|
|
return f"File is ({file_size} bytes) must be over {too_small}"
|
|
elif file_size > too_large:
|
|
return f"File is ({file_size} bytes) must be less than {too_large}"
|
|
else:
|
|
return None
|
|
|
|
except FileNotFoundError:
|
|
return "File not found"
|
|
except Exception as e:
|
|
return f"Error checking file size: {str(e)}"
|
|
|
|
"""
|
|
xxhash is about 5–10x faster than SHA256, non-cryptographic.
|
|
If you want an even lighter setup (no installs), we can use zlib.crc32 instead —
|
|
but xxhash is better if you care about collisions!
|
|
"""
|
|
def quick_file_hash(file_path):
|
|
hasher = xxhash.xxh64() # 64-bit very fast hash
|
|
try:
|
|
with open(file_path, 'rb') as f:
|
|
while chunk := f.read(8192): # Read in 8KB chunks
|
|
hasher.update(chunk)
|
|
except Exception as e:
|
|
print(f"Error hashing file: {e}")
|
|
return hasher.hexdigest()
|
|
|