# Simplified BSD License (FreeBSD License)
#
# Copyright (c) 2025, Daily Data Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

package ZFS_Utils;

use strict;
use warnings;
use Exporter 'import';
use Data::Dumper;
use POSIX qw(strftime);
use File::Path qw(make_path);

# library of ZFS related utility functions
# Copyright 2025 Daily Data Inc. <rodo@dailydata.net>

# currently used for sneakernet scripts, but plans to expand to other ZFS related tasks
# functions include:
#   runCmd: execute a command and return its output (captures exit status in $lastRunError;
#           supports optional stderr merge via $merge_stderr)
#   shredFile: securely delete a file using gshred (note: not effective on ZFS due to COW)
#   logMsg: timestamped logging to a file and optionally to console; respects $verboseLoggingLevel
#   loadConfig: load a YAML configuration file into a hashref and returns the hashref, or empty hashref on error
#   isMounted: check if a drive/device is mounted by mountPath or disk label; returns mountPath where mounted if found, empty string otherwise
#   mountDriveByLabel: find and mount a drive by its GPT label (supports ufs/msdos; waits
#           for device and creates mountpoint)
#   unmountDriveByLabel: unmount a drive found by GPT label and remove the mountpoint if empty
#   mountGeli: high level orchestrator to decrypt multiple GELI devices and import/mount a ZFS pool
#   decryptAndMountGeli: attach GELI devices, optionally build a combined key, import the pool
#           and mount ZFS datasets
#   makeGeliKey: create a GELI key by XOR'ing a remote binary keyfile and a local hex key;
#           automatically detects key size from remote keyfile (must be >= 128 bits and divisible by 8);
#           local key must match remote key size; writes binary key file with mode 0600
#   findGeliDisks: discover candidate disks suitable for GELI on the host
#   makeReplicateCommands: build zfs send/receive command lists from snapshot lists and prior status;
#           intelligently determines recursive vs per-filesystem sends and incremental vs full sends
#           based on snapshot availability; filters snapshots by matching parent path + dataset name
#           to avoid false matches with similarly-named datasets in different locations
#   getLatestSnapshots: find the latest snapshot for each filesystem based on date/time stamps in
#           snapshot names; parses YYYY-MM-DD and YYYY-MM-DD_HH:MM:SS format timestamps and returns
#           one snapshot per filesystem (the one with the latest date/time); uses parseSnapshotDateTime
#           helper for date parsing
#   parseSnapshotDateTime: internal helper function to parse date/time from snapshot names; accepts
#           multiple formats (YYYY-MM-DD, YYYY-MM-DD_HH:MM:SS, etc.) and returns epoch time or undef
#   snapShotReport: generate a comparison report between original and current snapshot states;
#           shows changes per filesystem with flags (CONSISTENT/ADDED/UNCHANGED) and counts snapshots
#           added between original and current states
#   datasetExists: check if a ZFS dataset exists on the system; returns dataset name if exists,
#           empty string if not (uses grep to suppress stderr messages from zfs list)
#   sendReport: helper to deliver replication reports (email/file) — exported for scripts to implement
#   fatalError: helper to log a fatal condition and die (convenience wrapper)
#   getDirectoryList: utility to list directory contents with optional filters
#   cleanDirectory: safe directory cleaning utility used by snapshot pruning helpers
# 
#   exported package variables: $logFileName, $displayLogsOnConsole, $lastRunError, $verboseLoggingLevel
#
# Revision History (oldest to newest):
# v1.0    (2025-12-15) - Initial tested release
# v1.0.1  (2025-12-15) - Added verbose logging control
# v1.1.0  (2025-12-17) - Added $VERSION, increased max verbosity to 5, optimized makeReplicateCommands
# v1.2.0  (2025-12-19) - Added getLatestSnapshots, parseSnapshotDateTime, snapShotReport functions
# v1.2.1  (2025-12-21) - Added datasetExists, fixed parseSnapshotDateTime dot-separated time bug
# v1.2.2  (2025-12-21) - Enhanced loadConfig validation, added _findMissingKeys helper
# v1.2.3  (2025-12-23) - Updated makeGeliKey to support variable key sizes
# v1.2.4  (2026-01-13) - Refactored loadConfig, added makeConfig and checkConfigKeys functions
# v1.2.5  (2024-06-06) - Added TTY logging support via $displayLogsTTY
# v1.2.6  (2026-01-15) - Added interpolateConfig, enhanced loadConfig for automatic interpolation
# v1.2.7  (2026-01-16) - Fixed cleanDirectory bug with non-existent directories
# v1.2.8  (2026-01-16) - Enhanced getDirectoryList with filter and invertFilter parameters
# v1.2.9  (2026-01-17) - Enhanced mountDriveByLabel with case-insensitive label discovery
# v1.2.10 (2026-01-17) - Enhanced copyReportToDrive with timestamp prepending
# v1.3.0  (2026-01-18) - Enhanced logging with automatic caller tracking, added sendInstructions function;
#                        changed sendInstructions to use global $displayLogsTTY (removed $config parameter)
# v1.3.1  (2026-01-18) - Converted configuration keys from snake_case to camelCase (checkInterval, mountPoint, waitTimeout)
# v1.3.2  (2026-01-18) - Added isMounted() function; refactored mountDriveByLabel() to use isMounted() for better error handling
#
# For detailed changelog, see CHANGELOG.md

# Exported functions and variables

our @EXPORT_OK = qw(loadConfig makeConfig interpolateConfig checkConfigKeys shredFile isMounted mountDriveByLabel unmountDriveByLabel mountGeli logMsg sendInstructions runCmd makeReplicateCommands getLatestSnapshots parseSnapshotDateTime snapShotReport sendReport fatalError getDirectoryList cleanDirectory datasetExists $logFileName $displayLogsOnConsole $lastRunError $verboseLoggingLevel $displayLogsTTY);

our $VERSION = '1.3.2';

# these are variables which affect the flow of the program and are exported so they can be modified by the caller
our $logFileName = '/tmp/zfs_utils.log'; # this can be overridden by the caller, and turned off with empty string
our $displayLogsOnConsole = 1; # if non-zero, log messages are also printed to console
our $merge_stderr = 0; # if set to 1, stderr is captured in runCmd
our $lastRunError = 0; # tracks the last error code from runCmd
our $verboseLoggingLevel = 0; # if non-zero, logMsg will include more verbose output
our $displayLogsTTY = ''; # if set to a TTY device path, log messages are also printed to that TTY
   
# Execute a command and return its output.
# If called in scalar context, returns the full output as a single string.
# If called in list context, returns the output split into lines.
# If $merge_stderr is true (default), stderr is merged into stdout (only for scalar commands).
# returns undef on failure and logs failure message.
sub runCmd {
   my $cmd = join( ' ', @_ );
   $merge_stderr = 1 unless defined $merge_stderr;
   my $output = '';

   logMsg( "Running command [$cmd]", 1 ) if $verboseLoggingLevel >= 2;
   $cmd .= ' 2>&1' if $merge_stderr;
   $output = `$cmd`;
   $lastRunError = $?;
   if ( $lastRunError ) {
      if ($? == -1) {
         logMsg( "failed to execute: $!");
         return '';
      } elsif ($? & 127) { # fatal error, exit program
         logMsg( sprintf( "child died with signal %d, %s coredump\n", ($? & 127),  ($? & 128) ? 'with' : 'without' ) );
         die;
      } elsif ($? >> 8) { # it had some return code other than 0
         logMsg( sprintf( "child exited with value %d\n", $? >> 8 ) );
      }
   }
   $output //= '';

   if (wantarray) {
      return $output eq '' ? () : split(/\n/, $output);
   } else {
      return $output;
   }
}

# this calls gshred which will overwrite the file 3 times, then
# remove it.
# NOTE: this will not work on ZFS, since ZFS is CopyOnWrite (COW)
# so assuming file is on something without COW (ramdisk, UFS, etc)
sub shredFile {
   my $filename = shift;
   `/usr/local/bin/gshred -u -f -s 32 $filename` if -e $filename;
}

# Log a message with timestamp to file, console, and/or TTY
#
# This function logs messages with automatic timestamping to multiple destinations based on
# configuration. It is the primary logging mechanism used throughout the ZFS utilities.
#
# Arguments:
#   $msg             - the message to log (required)
#   $logCaller       - if non-zero, include caller information (filename:line) (optional, default: 0)
#   $filename        - path to log file (optional, default: $logFileName global variable)
#   $timeStampFormat - strftime format string for timestamp (optional, default: '%Y-%m-%d %H:%M:%S')
#
# Behavior:
#   - Prepends a timestamp to the message in the format: "timestamp\tmessage\n"
#   - If $logCaller is non-zero, includes caller info between timestamp and message: "timestamp\t[filename:line]\tmessage\n"
#   - Writes to log file if $filename is defined and non-empty
#   - Also prints to STDOUT if $displayLogsOnConsole is non-zero
#   - Also writes to TTY device if $displayLogsTTY is set (contains device path)
#   - Dies if log file cannot be opened for writing
#
# Global variables used:
#   $logFileName          - default log file path (used if $filename not provided)
#   $displayLogsOnConsole - if non-zero, also print to console
#   $displayLogsTTY       - if set to TTY device path, also write to that TTY
#
# Returns: nothing
sub logMsg {
    my $msg = shift;
    my $logCaller = shift // 0;
    my $filename = shift // $logFileName;
    my $timeStampFormat = shift // '%Y-%m-%d %H:%M:%S';
    my $timestamp = strftime($timeStampFormat, localtime());
    
    # Add caller information if requested
    my $callerInfo = '';
    if ( $logCaller ) {
        my ($package, $file, $line) = caller();
        $callerInfo = "\t[$file:$line]";
    }
    
    $msg = "$timestamp$callerInfo\t$msg\n";
    # message goes in the log file if defined
    if (defined $filename && $filename ne '' ) {
       open my $logfh, '>>', $filename or die "Could not open log file $filename: $!\n";
       print $logfh $msg;
       close $logfh;
    }
    # also print to console if enabled
    print $msg if ($displayLogsOnConsole);
   # also print to TTY if defined
    if ( $displayLogsTTY ) {
      if ( open(my $fh, '>', $displayLogsTTY ) ) {
          print $fh $msg;
          close($fh);
      } else {
          warn "Could not open TTY $displayLogsTTY for logging: $!\n";
      }
    }
}

# Send instructions to system operator
#
# This function displays a message to the screen and optionally also sends it to a TTY device
# if $displayLogsTTY is configured. This is specifically for messages intended for the system
# operator and is separate from normal logging.
#
# Arguments:
#   $message - the message to display to the system operator
#
# Behavior:
#   - Always prints the message to STDOUT (screen)
#   - If $displayLogsTTY is non-zero (contains a TTY device path), also writes to that TTY
#   - Does NOT write to the log file (unlike logMsg)
#
# Global variables used:
#   $displayLogsTTY - if set to TTY device path, also write to that TTY
#
# Returns: nothing
sub sendInstructions {
    my $message = shift;
    
    # Always print to screen
    print "$message\n";
    
    # Also send to TTY if configured
    if ( $displayLogsTTY ) {
        if ( open(my $fh, '>', $displayLogsTTY ) ) {
            print $fh "$message\n";
            close($fh);
        } else {
            warn "Could not open TTY $displayLogsTTY for instructions: $!\n";
        }
    }
}

# Check if a drive/device is already mounted
# Can check by mountPath (if input starts with '/') or by disk label
# Returns the mountPath where device is mounted, empty string otherwise
# Arguments:
#   $pathOrLabel - mountPath (e.g., '/mnt/sneakernet') or disk label (e.g., 'sneakernet')
# Returns:
#   mountPath where mounted if found, empty string otherwise
# Examples:
#   my $mounted = isMounted('/mnt/sneakernet');  # check by mount path
#   my $mounted = isMounted('sneakernet');        # check by disk label
#   if ($mounted) { print "Mounted at $mounted\n"; }
sub isMounted {
   my ($pathOrLabel) = @_;
   return '' unless defined $pathOrLabel && length($pathOrLabel);
   
   # Determine if input is a path or a label
   if ($pathOrLabel =~ m{^/}) {
      # It's a path - check if something is mounted there
      logMsg("isMounted: Checking if mountPath $pathOrLabel is mounted", 1) if $verboseLoggingLevel >= 3;
      
      my $output = runCmd( "mount | grep '$pathOrLabel'" );
      if ( $lastRunError == 0 ) {
         logMsg("isMounted: Found $pathOrLabel in mount output", 1) if $verboseLoggingLevel >= 3;
         return $pathOrLabel;
      }
      
      logMsg("isMounted: $pathOrLabel not mounted", 1) if $verboseLoggingLevel >= 3;
      return '';
   } else {
      # It's a label - check if device with this label is mounted
      logMsg("isMounted: Checking if device with label '$pathOrLabel' is mounted", 1) if $verboseLoggingLevel >= 3;
      
      # Check both /dev/gpt and /dev/msdosfs for the label
      my @devicePaths = (
         "/dev/gpt/$pathOrLabel",
         "/dev/msdosfs/$pathOrLabel"
      );
      
      foreach my $devicePath (@devicePaths) {
         my $output = runCmd( "mount | grep '$devicePath'" );
         if ( $lastRunError == 0 ) {
            # Parse the mount output to extract the mount point
            # mount output format: "device on mountpoint (filesystem, options)"
            if ($output =~ m{^\Q$devicePath\E\s+on\s+(\S+)\s+}) {
               my $mountPoint = $1;
               logMsg("isMounted: Device $devicePath is mounted at $mountPoint", 1) if $verboseLoggingLevel >= 3;
               return $mountPoint;
            }
         }
      }
      
      logMsg("isMounted: Device with label '$pathOrLabel' not mounted", 1) if $verboseLoggingLevel >= 3;
      return '';
   }
}

# find a drive by it's label by scanning /dev/gpt/ and /dev/msdosfs/
# driveInfo is a hashref with the following keys:
# label - the GPT label of the drive (required)
# filesystem - the filesystem type (default: ufs)
# mountPath - where to mount the drive (default: /mnt/label)
# timeout - how long to wait for the drive (default: 600 seconds)
# checkInterval - how often to check for the drive (default: 15 seconds)
# The function performs case-insensitive label discovery across both /dev/gpt and /dev/msdosfs
# If label found in unexpected location, fstype is automatically corrected
# If the drive is found, mount it on mountPath and return the mountPath.
# If not found, return empty string.
sub mountDriveByLabel {
   my ( $driveInfo ) = @_;
   unless ($driveInfo->{label}) {
      logMsg("mountDriveByLabel: No drive label provided");
      return '';
   }
   unless ( $driveInfo->{label} =~ /^[a-zA-Z0-9_\-]+$/ ) {
      logMsg("mountDriveByLabel: Invalid label '$driveInfo->{label}'");
      return '';
   }

   logMsg("mountDriveByLabel: Looking for drive with label '$driveInfo->{label}'") if $verboseLoggingLevel >= 1;
   # default to /mnt/label if not provided
   $driveInfo->{mountPath} //= "/mnt/$driveInfo->{label}"; # this is where we'll mount it if we find it
   
   # Check if already mounted at the expected mountPath before setting defaults
   if ( isMounted($driveInfo->{mountPath}) ) {
      logMsg("Drive already mounted at $driveInfo->{mountPath}", 1) if $verboseLoggingLevel >= 2;
      return $driveInfo->{mountPath};
   }
   
   # Set defaults after checking if already mounted (so we don't set wrong fstype)
   $driveInfo->{fstype} //= 'ufs'; # default to mounting ufs
   # The location for the label depends on filesystem. Only providing access to ufs and msdos here for safety.
   # gpt labeled drives for ufs are in /dev/gpt/, for msdosfs in /dev/msdosfs/
   my $labelPath = $driveInfo->{fstype} eq 'msdos' ? "/dev/msdosfs/$driveInfo->{label}" : "/dev/gpt/$driveInfo->{label}";
   # default to 10 minutes (600 seconds) if not provided
   $driveInfo->{timeout} //= 600;
   # default to checking every minute if not provided
   $driveInfo->{checkInterval} //= 15;
   # wait up to $timeout seconds for device to appear, checking every 10 seconds
   while ( $driveInfo->{timeout} > 0 ) {
      if ( -e "$labelPath" ) {
         last;
      } else {
         # If label not found, try case-insensitive search in both /dev/gpt and /dev/msdosfs
         my $foundPath = '';
         my $foundType = '';
         
         # Check /dev/gpt for ufs filesystems
         if ( -d '/dev/gpt' ) {
            opendir(my $dh, '/dev/gpt') or logMsg("Cannot read /dev/gpt: $!");
            if ($dh) {
               my @entries = grep { !/^\./ } readdir($dh);
               closedir($dh);
               foreach my $entry (@entries) {
                  if (lc($entry) eq lc($driveInfo->{label})) {
                     $foundPath = "/dev/gpt/$entry";
                     $foundType = 'ufs';
                     logMsg("Found label '$entry' in /dev/gpt (case-insensitive match)", 1) if $verboseLoggingLevel >= 2;
                     last;
                  }
               }
            }
         }
         
         # Check /dev/msdosfs for msdos filesystems if not found yet
         if (!$foundPath && -d '/dev/msdosfs') {
            opendir(my $dh, '/dev/msdosfs') or logMsg("Cannot read /dev/msdosfs: $!");
            if ($dh) {
               my @entries = grep { !/^\./ } readdir($dh);
               closedir($dh);
               foreach my $entry (@entries) {
                  if (lc($entry) eq lc($driveInfo->{label})) {
                     $foundPath = "/dev/msdosfs/$entry";
                     $foundType = 'msdos';
                     logMsg("Found label '$entry' in /dev/msdosfs (case-insensitive match)", 1) if $verboseLoggingLevel >= 2;
                     last;
                  }
               }
            }
         }
         
         # If we found it via case-insensitive search, update our paths and fstype
         if ($foundPath) {
            $labelPath = $foundPath;
            $driveInfo->{fstype} = $foundType;
            logMsg("Updated fstype to '$foundType' based on discovery", 1) if $verboseLoggingLevel >= 2;
            last;
         }
         
         sendInstructions( "Waiting for drive labeled $driveInfo->{label}, looking in $labelPath\n" );
         sleep $driveInfo->{checkInterval};
         $driveInfo->{timeout} -= $driveInfo->{checkInterval};
      }
    }
    # if we found it, mount and return mount path
    if ( -e "$labelPath" ) {
       # ensure mount point
       unless ( -d $driveInfo->{mountPath} || make_path($driveInfo->{mountPath}) ) {
         logMsg("Failed to create $driveInfo->{mountPath}: $!");
         return '';
       }
       # mount device
       my $mountOutput = runCmd( "mount -t $driveInfo->{fstype} $labelPath $driveInfo->{mountPath}" );
       if ( $lastRunError ) {
         # Check if it's already mounted (common error case)
         if ( isMounted($driveInfo->{mountPath}) ) {
            logMsg("Drive appears to be already mounted at $driveInfo->{mountPath}", 1) if $verboseLoggingLevel >= 2;
            return $driveInfo->{mountPath};
         }
         logMsg("Failed to mount $labelPath on $driveInfo->{mountPath}: $mountOutput") if $verboseLoggingLevel >= 0;
         return '';
       }
       return $driveInfo->{mountPath};
    } else {
       return '';
    }
}

# finds and unmounts a drive defined by $driveInfo.
# on success, removes the mount point if empty.
sub unmountDriveByLabel {
   my ( $driveInfo ) = @_;
   unless ($driveInfo->{label}) {
      logMsg("unmountDriveByLabel: No drive label provided");
      return '';
   }
   unless ( $driveInfo->{label} =~ /^[a-zA-Z0-9_\-]+$/ ) {
      logMsg("unmountDriveByLabel: Invalid label '$driveInfo->{label}'");
      return '';
   }

   logMsg("unmountDriveByLabel: Looking for drive with label '$driveInfo->{label}'") if $verboseLoggingLevel >= 1;
   # default to /mnt/label if not provided
   $driveInfo->{mountPath} //= "/mnt/$driveInfo->{label}"; # this is where we'll mount it if we find it
   
   runCmd( "mount | grep '$driveInfo->{mountPath}'" );
   if ( $lastRunError ) {
     logMsg("Drive with label '$driveInfo->{label}' is not mounted", 1) if $verboseLoggingLevel >= 2;
     return '';
   }

   # unmount device
   runCmd( "umount $driveInfo->{mountPath}" );
   if ( $lastRunError ) {
     logMsg("Failed to unmount $driveInfo->{mountPath}: $!");
     return '';
   }

   # and remove the directory if empty (find command will return empty string or one filename)
   rmdir $driveInfo->{mountPath} unless runCmd( "find $driveInfo->{mountPath} -mindepth 1 -print -quit");
   return $driveInfo->{mountPath};
}

## Interpolate variables in configuration structure.
##
## Arguments:
##   $config - HASHREF or ARRAYREF to process (modified in place)
##   $vars   - HASHREF containing variable names and their values
##
## Behavior:
##   - Recursively walks through the configuration structure (hashes and arrays)
##   - Replaces strings of the form <varname> with values from $vars hashref
##   - Modifies the structure in place
##
## Example:
##   Given $vars = {scriptDirectory => '/opt/scripts'}
##   "<scriptDirectory>/file.txt" becomes "/opt/scripts/file.txt"
##
## Returns: nothing (modifies $config in place)
sub interpolateConfig {
   my ($config, $vars) = @_;
   
   if (ref($config) eq 'HASH') {
      # Process hash values recursively
      for my $key (keys %$config) {
         if (ref($config->{$key})) {
            # Recursively process nested structures
            interpolateConfig($config->{$key}, $vars);
         } elsif (defined $config->{$key}) {
            # Replace <varname> with actual values
            for my $varname (keys %$vars) {
               $config->{$key} =~ s/<$varname>/$vars->{$varname}/g;
            }
         }
      }
   } elsif (ref($config) eq 'ARRAY') {
      # Process array elements recursively
      for my $element (@$config) {
         if (ref($element)) {
            interpolateConfig($element, $vars);
         } elsif (defined $element) {
            # Replace <varname> with actual values
            for my $varname (keys %$vars) {
               $element =~ s/<$varname>/$vars->{$varname}/g;
            }
         }
      }
   }
}

## Load a YAML or JSON configuration file into a hashref.
## 
## Arguments:
##   $filename - path to the YAML or JSON configuration file
##   $interpolationHash - OPTIONAL hashref for variable interpolation
##
## Behavior:
##   - Reads the file as a string first to check for interpolation markers
##   - If markers (/<[^>]*>/) are found, marks the config as "dirty"
##   - Parses the string to a hash (YAML or JSON depending on format)
##   - If dirty, runs interpolateConfig() to replace markers with values
##   - Saves the interpolated config back to the file (so future loads skip this step)
##
## Returns: hashref of loaded configuration or empty hashref on error
##
sub loadConfig {
    my ( $filename, $interpolationHash ) = @_;
    
    $interpolationHash //= {};
    # If no filename was provided, return empty hashref
    return {} unless defined $filename;

    # If file doesn't exist, return empty hashref
    unless (-e $filename) {
      logMsg("Config file $filename does not exist.", 1) if $verboseLoggingLevel >= 2;
      return {};
   }

   # Read the file as a string first
   my $configString;
   {
      local $/; # Enable slurp mode
      open my $fh, '<', $filename or do {
         logMsg("Cannot read config file $filename: $!", 1) if $verboseLoggingLevel >= 2;
         return {};
      };
      $configString = <$fh>;
      close $fh;
   }

   # Check for interpolation markers using regex /<[^>]*>/
   my $isDirty = ($configString =~ /<[^>]*>/);
   
   logMsg("Config file has interpolation markers, will process and save back", 1) if $isDirty && $verboseLoggingLevel >= 2;

   my $config;
   my $isYAML = 0;
   my $isJSON = 0;

   # Try to determine format and parse string to hash
   # Try YAML::XS first, fall back to YAML::Tiny
   eval {
      require YAML::XS;
      YAML::XS->import('Load');
      $config = YAML::XS::Load($configString);
      $isYAML = 1;
      logMsg("using YAML::XS to load $filename", 1) if $verboseLoggingLevel >= 3;
      1;
   } or do {
      eval {
         require YAML::Tiny;
         YAML::Tiny->import();
         my $yaml = YAML::Tiny->read_string($configString);
         $config = $yaml->[0] if $yaml;  # YAML::Tiny returns an arrayref of documents
         $isYAML = 1;
         logMsg("using YAML::Tiny to load $filename", 1) if $verboseLoggingLevel >= 3;
         1;
      } or do {
         # Try JSON as fallback
         for my $json_module ('JSON::XS', 'JSON::PP', 'JSON') {
            if (eval "require $json_module; 1") {
               my $json_obj = $json_module->new;
               eval {
                  $config = $json_obj->decode($configString);
                  $isJSON = 1;
                  logMsg("using $json_module to load $filename", 1) if $verboseLoggingLevel >= 3;
                  1;
               };
               last if $isJSON;
            }
         }
         unless ($isJSON) {
            logMsg("No YAML or JSON parser available. Skipping config load from $filename", 1) if $verboseLoggingLevel >= 2;
            return {};
         }
      };
   };
   
   # Ensure we have a hashref
   unless (defined $config && ref $config eq 'HASH') {
      logMsg("Config file $filename did not produce a HASH.", 1) if $verboseLoggingLevel >= 2;
      return {};
   }
   
   # If dirty, interpolate the config and save it back
   if ($isDirty && keys %$interpolationHash) {
      logMsg("Interpolating config variables", 1) if $verboseLoggingLevel >= 2;
      interpolateConfig($config, $interpolationHash);
      
      # Save the interpolated config back to the file
      logMsg("Saving interpolated config back to $filename", 1) if $verboseLoggingLevel >= 2;
      makeConfig($filename, $config);
   }
   
   return $config;
}

## Create a configuration file from a hashref.
##
## Arguments:
##   $filename - path where the configuration file will be written (string)
##   $config   - hashref containing the configuration data to save
##
## Behavior:
##   - Attempts to save the configuration using available YAML libraries first (YAML::XS, YAML::Tiny, YAML)
##   - If no YAML library is available, tries JSON libraries (JSON::XS, JSON::PP, JSON)
##   - Dies with error message if no suitable library is available
##   - Logs success message when file is created (if $verboseLoggingLevel >= 1)
##
## Returns: nothing (returns early on success, dies on failure)
sub makeConfig {
   my ($filename, $config) = @_;
   
   # Try YAML libraries first (preferred format)
   my $yaml_saved = 0;
   for my $yaml_module ('YAML::XS', 'YAML::Tiny', 'YAML') {
      if (eval "require $yaml_module; 1") {
         $yaml_module->import('DumpFile');
         DumpFile($filename, $config);
         logMsg("Created configuration file '$filename' using $yaml_module") if $verboseLoggingLevel >= 1;
         $yaml_saved = 1;
         last;
      }
   }
   return if $yaml_saved;
   
   # If no YAML available, try JSON libraries
   my $json_saved = 0;
   for my $json_module ('JSON::XS', 'JSON::PP', 'JSON') {
      if (eval "require $json_module; 1") {
         my $json_obj = $json_module->new->pretty->canonical;
         if (open my $fh, '>', $filename) {
            print $fh $json_obj->encode($config);
            close $fh;
            logMsg("Created configuration file '$filename' using $json_module") if $verboseLoggingLevel >= 1;
            $json_saved = 1;
            last;
         } else {
            die "Could not write to '$filename': $!\n";
         }
      }
   }
   return if $json_saved;
   
   # No suitable library found
   die "Cannot create configuration file: no YAML or JSON library available.\n" .
       "Please install one of: YAML::XS, YAML::Tiny, YAML, JSON::XS, JSON::PP, or JSON\n";
}

## Check that required configuration keys have values.
##
## Arguments:
##   $config             - hashref containing the loaded configuration
##   $configRequiredKeys - arrayref of required key paths (dot-separated notation)
##
## Behavior:
##   - Traverses the config hashref using dot-separated key paths
##   - Example: "key1.key2.key3" checks $config->{key1}->{key2}->{key3}
##   - Validates that each required key exists and has a non-empty value
##   - Logs missing or empty keys at verbosity level >= 1
##   - Empty values include: undef, empty strings, whitespace-only strings,
##     empty arrays, and empty hashes
##
## Returns: 
##   - 1 (true) if all required keys have values
##   - 0 (false) if any required key is missing or empty
##
## Example:
##   my $requiredKeys = ['datasets', 'transport.label', 'source.hostname'];
##   checkConfigKeys($config, $requiredKeys) or die "Missing required config keys\n";
sub checkConfigKeys {
   my ($config, $configRequiredKeys) = @_;
   
   return 1 unless defined $configRequiredKeys && ref($configRequiredKeys) eq 'ARRAY';
   
   my $allKeysValid = 1;
   
   foreach my $keyPath (@$configRequiredKeys) {
      # Split the dot-separated path into individual keys
      my @keys = split(/\./, $keyPath);
      
      # Traverse the config hash following the key path
      my $current = $config;
      my $keyExists = 1;
      
      foreach my $key (@keys) {
         if (ref($current) eq 'HASH' && exists $current->{$key}) {
            $current = $current->{$key};
         } else {
            $keyExists = 0;
            last;
         }
      }
      
      # Check if key exists and has a value
      if (!$keyExists) {
         logMsg("Missing required config key: $keyPath");
         $allKeysValid = 0;
      } elsif (!defined $current) {
         logMsg("Required config key has no value (undef): $keyPath");
         $allKeysValid = 0;
      } elsif (ref($current) eq '') {
         # Scalar value - check if it's empty or whitespace-only
         if ($current =~ /^\s*$/) {
            logMsg("Required config key has no value (empty/whitespace): $keyPath");
            $allKeysValid = 0;
         }
      } elsif (ref($current) eq 'ARRAY') {
         # Array - check if empty
         if (@$current == 0) {
            logMsg("Required config key has no value (empty array): $keyPath");
            $allKeysValid = 0;
         }
      } elsif (ref($current) eq 'HASH') {
         # Hash - check if empty
         if (keys %$current == 0) {
            logMsg("Required config key has no value (empty hash): $keyPath");
            $allKeysValid = 0;
         }
      }
   }
   
   return $allKeysValid;
}

## Mount a GELI-encrypted ZFS pool (high-level orchestration).
##
## Arguments:
##   $geliConfig - HASHREF containing GELI/ZFS mounting configuration. Expected keys include:
##       poolname        - name of the zpool to import
##       secureKey       - HASHREF with { label, keyfile, path } describing the keyfile disk
##       target          - path where the combined keyfile will be written
##       diskList        - OPTIONAL arrayref of disk device names (eg: ['ada0','ada1'])
##
## Behavior:
##   - Mounts the keyfile disk (using mountDriveByLabel), builds the combined key (makeGeliKey),
##     then calls decryptAndMountGeli to attach geli devices and import/mount the zpool.
##
## Returns:
##   Pool name (string) on success, empty string on error.
sub mountGeli {
   my $geliConfig = shift;
   logMsg( "geli config detected, attempting to mount geli disks", 1 ) if $verboseLoggingLevel >= 2;
   logMsg "geliConfig in mountGeli\n" .  Dumper($geliConfig), 1 if $verboseLoggingLevel >= 4;
   # Can't continue at all if no pool name
   unless ( $geliConfig->{'poolname'} ) {
      logMsg "Could not find pool name in configuration file\n";
      return '';
   }
   # check if the pool already attached (grep returns 0 on found, something else on not)
   runCmd( "zpool list -H -o name | grep $geliConfig->{poolname}" );
   unless ( $lastRunError ) {
      logMsg( "Pool $geliConfig->{poolname} already active") if $verboseLoggingLevel >= 1; 
      return $geliConfig->{poolname};
   }
   
   # find the keyfile disk and mount it
   $geliConfig->{secureKey}->{path} = mountDriveByLabel( $geliConfig->{secureKey} );
   unless ( $geliConfig->{secureKey}->{path} ) {
      logMsg "Could not find or mount keyfile disk with label: " . $geliConfig->{secureKey}->{label};
      return '';
   }
   # create the combined geli keyfile in target location
   unless ( makeGeliKey( $geliConfig ) ) {
         logMsg "Could not create geli keyfile\n";
         return '';
      }
   # decrypt and mount the geli disks and zfs pool
   my $poolname = decryptAndMountGeli( $geliConfig );
   return $poolname;
                                                
}

## Discover disks suitable for GELI/ZFS use on the host.
##
## Returns an array of device names (eg: qw( ada0 ada1 )) that appear free for use.
## The routine collects all disks, excludes disks with existing partitions and those
## referenced by active zpools.
sub findGeliDisks {
   logMsg("Finding available disks for GELI/ZFS use", 1) if $verboseLoggingLevel >= 2;
   # get all disks in system
   my %allDisks = map{ chomp $_ ; $_ => 1 } runCmd( "geom disk list | grep 'Geom name:' | rev | cut -d' ' -f1 | rev" );
   # get the disks with partitions
   my @temp = runCmd( "gpart show -p | grep '^=>'");  # -p prints just the disks without partitions
   # remove them from the list
   foreach my $disk ( @temp ) {
      $allDisks{$1} = 0 if ( $disk =~ m/^=>[\t\s0-9]+([a-z][a-z0-9]+)/ ) ;
   }

   # get disk which are currently used for zpools
   @temp = runCmd( "zpool status -LP | grep '/dev/'" );
   foreach my $disk ( @temp ) {
      $allDisks{$1} = 0 if  $disk =~ m|/dev/([a-z]+\d+)|;
   }

   # return only the disks which are free (value 1)
   return grep{ $allDisks{$_} == 1 } keys %allDisks;
}

## Decrypt GELI-encrypted disks and import/mount the ZFS pool.
##
## Arguments:
##   $geliConfig - HASHREF expected to contain:
##       poolname - zpool name to import
##       target   - path to the combined GELI keyfile created by makeGeliKey
##       diskList - OPTIONAL arrayref of disk device names (if omitted, findGeliDisks() is used)
##
## Behavior:
##   - Ensures the pool is not already imported
##   - Attaches (geli attach) each supplied disk using the keyfile
##   - Attempts to import the specified pool and runs `zfs mount -a` to mount datasets
##
## Returns:
##   Pool name (string) on success; empty string on failure.
sub decryptAndMountGeli {
   my ($geliConfig) = shift;
   
   logMsg( "decryptAndMountGeli:\n" . Dumper( $geliConfig ), 1 ) if $verboseLoggingLevel >= 3;

   # if no list of disks provided, try to find them
   $geliConfig->{'diskList'} //= [ findGeliDisks() ];
   
   my $diskList = $geliConfig->{'diskList'};
   my $poolname = $geliConfig->{'poolname'};
   my $keyfile = $geliConfig->{'target'};

   # check if the pool already attached (grep returns 0 on found, something else on not)
   runCmd( "zpool list -H -o name | grep $poolname" );
   return $poolname unless $lastRunError;

   unless ( -e $keyfile ) {
      logMsg "GELI keyfile $keyfile does not exist\n";
      return '';
   }

   my @decrypted_devices;

   # Decrypt each disk in the list
   foreach my $disk (@{$geliConfig->{'diskList'}}) {
      $disk = '/dev/' . $disk unless $disk =~ m|/dev|;
      unless ( -e $disk ) {
         logMsg "Disk $disk does not exist\n";
         return '';
      }

      # Derive the decrypted device name (.eli suffix on FreeBSD)
      my $decrypted = $disk . '.eli';

      # Decrypt using geli attach with the keyfile
      logMsg("Decrypting $disk with keyfile $keyfile", 1) if $verboseLoggingLevel >= 2;
      runCmd("geli attach -p -k $geliConfig->{target} $disk");
      if ( $lastRunError) {
         logMsg "Failed to decrypt $disk (exit $lastRunError)\n", 1 if $verboseLoggingLevel >= 3;
         next; # ignore failed disks and continue to see if we can import the pool
      }

      unless ( -e $decrypted ) {
         logMsg "Decrypted device $decrypted does not exist after geli attach\n" if $verboseLoggingLevel >= 0;
         return '';
      }
      push @decrypted_devices, $decrypted;
   }

   # Import the ZFS pool
   logMsg("Importing ZFS pool $poolname") if $verboseLoggingLevel >= 0;
   my @import_cmd = ('zpool', 'import');
   
   push @import_cmd, $poolname;

   runCmd("zpool import $poolname" );
   unless ( $lastRunError == 0 ) {
      logMsg("Failed to import zfs pool $poolname (exit $lastRunError)\n");
      return '';
   }

   # Mount the ZFS pool (zfs mount -a mounts all filesystems in the pool)
   logMsg("Mounting ZFS pool $poolname") if $verboseLoggingLevel >= 1;
   runCmd('zfs mount -a');
   unless ( $lastRunError == 0 ) {
      logMsg("Failed to mount zfs pool $poolname (exit $lastRunError)\n");
      return '';
   }
   
   logMsg("Successfully decrypted and mounted pool $poolname", 1) if $verboseLoggingLevel >= 2;
   return $poolname;
}

## Create a GELI key by XOR'ing a remote binary keyfile and a local key (hex string).
##
## Expected input (via $geliConfig HASHREF):
##   $geliConfig->{secureKey}->{path} - directory where the remote keyfile resides
##   $geliConfig->{secureKey}->{keyfile} - filename of the remote 32-byte binary key
##   $geliConfig->{localKey} - 64-hex char string OR path to a file containing the hex
##   $geliConfig->{target} - path to write the resulting 32-byte binary key
##
## Behavior:
##   - Reads 32 bytes from the remote binary key
##   - Reads/cleans the 64-hex local key and converts it to 32 bytes
##   - XORs the two 32-byte buffers and writes the 32-byte result to $target with mode 0600
##
## Returns: 1 on success. Dies on unrecoverable errors.
sub makeGeliKey {
   my ( $geliConfig ) = @_;

   $geliConfig->{secureKey}->{keyfile} //= '';
   $geliConfig->{localKey} //= '';
   $geliConfig->{secureKey}->{path} //= '';
   $geliConfig->{target} //= '';

   if ( $geliConfig->{target} && -f $geliConfig->{target} ) {
      logMsg "GELI target keyfile $geliConfig->{target} already exists. Not overwriting.\n", 1 if $verboseLoggingLevel >= 2;
      return 1;
   }

   my $remote_keyfile = ($geliConfig->{secureKey}->{path} ? "$geliConfig->{secureKey}->{path}/" : '' ) . $geliConfig->{secureKey}->{keyfile};
   my $localKeyHexOrPath = $geliConfig->{localKey};
   my $target = $geliConfig->{target};

   if ( $geliConfig->{secureKey}->{keyfile} && $geliConfig->{localKey} ) {
      # we have what we need to proceed

      if ( -f $remote_keyfile ) {
         logMsg "Creating GELI keyfile at $target using remote keyfile $remote_keyfile and local key\n" 
            if $verboseLoggingLevel >= 2;
      } else {
         die "Remote keyfile '$remote_keyfile' does not exist\n";
      }
   }

   # Read remote binary key to determine size
   open my $rh, '<:raw', $remote_keyfile or die "Unable to open $remote_keyfile: $!\n";
   my $rbuf;
   # Read entire file to get actual key size
   local $/;
   $rbuf = <$rh>;
   close $rh;
   die "Failed to read from $remote_keyfile\n" unless defined $rbuf;
   
   my $keySizeBytes = length($rbuf);
   my $keySizeBits = $keySizeBytes * 8;
   
   # Validate key size
   die "Remote key size must be at least 128 bits (got $keySizeBits bits)\n" if $keySizeBits < 128;
   die "Remote key size must be divisible by 8 (got $keySizeBits bits)\n" if $keySizeBits % 8 != 0;
   
   logMsg "Using GELI key size: $keySizeBits bits ($keySizeBytes bytes)\n", 1 if $verboseLoggingLevel >= 3;

   # Get local hex string (either direct string or file contents)
   my $hex;
   if (-e $localKeyHexOrPath) {
      open my $lh, '<', $localKeyHexOrPath or die "Unable to open local key file $localKeyHexOrPath: $!\n";
      local $/ = undef;
      $hex = <$lh>;
      close $lh;
   } else {
      $hex = $localKeyHexOrPath;
   }
   # clean hex (remove whitespace/newlines and optional 0x)
   $hex =~ s/0x//g;
   $hex =~ s/[^0-9a-fA-F]//g;

   my $expectedHexLength = $keySizeBytes * 2;  # 2 hex chars per byte
   die "Local key must be $expectedHexLength hex characters ($keySizeBits-bit), got " . length($hex) . " characters\n" 
      unless length($hex) == $expectedHexLength;

   my $lbuf = pack('H*', $hex);
   die "Local key decoded to unexpected length " . length($lbuf) . " (expected $keySizeBytes)\n" 
      unless length($lbuf) == $keySizeBytes;

   # XOR the two buffers
   my $out = '';
   for my $i (0 .. $keySizeBytes - 1) {
      $out .= chr( ord(substr($rbuf, $i, 1)) ^ ord(substr($lbuf, $i, 1)) );
   }

   # Ensure target directory exists
   my ($vol, $dirs, $file) = ($target =~ m{^(/?)(.*/)?([^/]+)$});
   if ($dirs) {
      my $dir = $dirs;
      $dir =~ s{/$}{};
      unless (-d $dir) {
         require File::Path;
         File::Path::make_path($dir) or die "Failed to create directory $dir: $!\n";
      }
   }

   # Write out binary key and protect permissions
   open my $oh, '>:raw', $target or die "Unable to open $target for writing: $!\n";
   print $oh $out or die "Failed to write to $target: $!\n";
   close $oh;
   chmod 0600, $target;

   return 1;
}

# make a bunch of replicate commands and return them to the caller as a list
# $sourceSnapsRef - list of snapshots on source machine
# $targetSnapsRef - list of snapshots on target machine
# $dataset - The name of the dataset we are working on (same on both source and target)
# $sourceParent - The parent dataset of $dataset on source
# $targetParent - The parent dataset of $dataset on target
# $newStatusRef - A place to put the updated $targetSnapsRef
# returns hashref of commands to execute, of form
#    {$dataset} = "zfs send command"
# where $dataset above can be a child of $dataset
sub makeReplicateCommands {
   my ( $sourceSnapsRef, $targetSnapsRef, $dataset, $sourceParent, $targetParent, $newStatusRef ) = @_;
   
   # Ensure all array refs are defined (use empty arrays if not provided)
   $sourceSnapsRef ||= [];
   $targetSnapsRef     ||= [];
   $newStatusRef  ||= [];
   
   # Normalize parent paths: ensure they end with '/' unless empty
   # This makes path construction consistent later (e.g., "pool/" + "dataset")
   $sourceParent //= '';
   $sourceParent .= '/' unless $sourceParent eq '' or substr($sourceParent, -1) eq '/';
   $targetParent //= '';
   $targetParent .= '/' unless $targetParent eq '' or substr($targetParent, -1) eq '/';


   logMsg( "dataset=[$dataset] sourceParent=[$sourceParent] targetParent=[$targetParent]" )
     if $verboseLoggingLevel >= 4;
   logMsg( "source snapshots count=" . scalar(@$sourceSnapsRef) . ", target snapshots count=" . scalar(@$targetSnapsRef) )
     if $verboseLoggingLevel >= 4;
   if ($verboseLoggingLevel >= 5) {
      logMsg( "RAW target snapshots BEFORE filtering:" );
      foreach my $snap (@$targetSnapsRef) {
         logMsg( "  [$snap]" );
      }
      logMsg( "RAW source snapshots BEFORE filtering:" );
      foreach my $snap (@$sourceSnapsRef) {
         logMsg( "  [$snap]" );
      }
   }
   
   my %commands; # Hash to store generated zfs send commands, keyed by filesystem name

   fatalError( "No dataset defined in makeReplicateCommands, can not continue") unless $dataset;

   # Filter snapshot lists to only include snapshots matching our dataset and its children
   # The dataset should match as a full path component (not substring)
   # Then strip the parent path prefix from each snapshot name
   # Example: "storage/mydata@snap1" becomes "mydata@snap1" when sourceParent="storage/"
   # This allows us to work with relative paths and handle different parent paths on source/target
   # Match: storage/mydata@snap, storage/mydata/child@snap
   # Don't match: storage/mydataset@snap (if dataset is "mydata")
   # Don't match: storage/otherparent/mydata@snap (different parent path)
   my $targetSnaps = [ map{ s/^$targetParent//r } grep{ /^\Q$targetParent$dataset\E(?:\/|@)/ } @$targetSnapsRef ];
   my $sourceSnaps = [ map{ s/^$sourceParent//r } grep{ /^\Q$sourceParent$dataset\E(?:\/|@)/ } @$sourceSnapsRef ];

   logMsg( "filtered source snapshots count=" . scalar(@$sourceSnaps) . ", filtered target snapshots count=" . scalar(@$targetSnaps), 1 ) if $verboseLoggingLevel >= 4;
   logMsg( "filtered source snapshots: " . join(', ', sort @$sourceSnaps), 1 ) if $verboseLoggingLevel >= 5;
   logMsg( "filtered target snapshots: " . join(', ', sort @$targetSnaps), 1 ) if $verboseLoggingLevel >= 5;

   # Parse source snapshots to build a hash indexed by filesystem
   # Input lines may have format: "pool/fs@snapshot extra data"
   # We extract just the first token (pool/fs@snapshot) and split it into filesystem and snapshot name
   # Result: %snaps_by_fs = { "pool/fs" => ["snap1", "snap2", ...] }
   # This groups all snapshots by their parent filesystem
   my %snaps_by_fs;
   foreach my $line (@$sourceSnaps) {
      next unless defined $line && $line =~ /\S/;  # Skip empty lines
      my ($tok) = split /\s+/, $line;              # Get first token
      next unless $tok && $tok =~ /@/;             # Must contain @ separator
      my ($fs, $snap) = split /@/, $tok, 2;        # Split into filesystem and snapshot name
      push @{ $snaps_by_fs{$fs} }, $snap;          # Add snapshot to this filesystem's list
   }

   logMsg( "parsed filesystems: " . join(', ', sort keys %snaps_by_fs), 1 ) if $verboseLoggingLevel >= 4;

   # If no snapshots were found, return empty array (nothing to replicate)
   return [] unless keys %snaps_by_fs;

   # Determine the root filesystem for recursive operations
   # We try to get it from the first non-empty snapshot line, otherwise use first sorted key
   # The root filesystem is used when we can do a single recursive send instead of multiple sends
   my ($first_line) = grep { defined $_ && $_ =~ /\S/ } @$sourceSnaps;
   my ($root_fs) = $first_line ? (split(/\s+/, $first_line))[0] =~ /@/ ? (split(/@/, (split(/\s+/, $first_line))[0]))[0] : undef : undef;
   $root_fs ||= (sort keys %snaps_by_fs)[0];

   # Build a hash of the most recent snapshot on target for each filesystem
   # This tells us what's already been replicated, so we can do incremental sends
   # If a filesystem isn't in this hash, we need to do a full (non-incremental) send
   # Note: If multiple snapshots exist for a filesystem, we keep only the last one
   # (later entries override earlier ones in the hash assignment)
   my %last_status_for;
   for my $s (@$targetSnaps) {
      next unless $s && $s =~ /@/;
      my ($fs, $snap) = split /@/, $s, 2;
      $last_status_for{$fs} = $snap;    # later entries override earlier ones -> last occurrence kept
   }

   if ($verboseLoggingLevel >= 4) {
      logMsg( "last status snapshots:" );
      for my $fs (sort keys %last_status_for) {
         logMsg( "  $fs => $last_status_for{$fs}" );
      }
   }

   # Build "from" and "to" snapshot mappings for each filesystem
   # "to" = the newest snapshot on source (what we want to send)
   # "from" = the last replicated snapshot on target (what we're sending from)
   # If "from" is undef, this filesystem hasn't been replicated before -> full send needed
   # Example: from="daily-2025-12-15" to="daily-2025-12-17" -> incremental send
   #          from=undef to="daily-2025-12-17" -> full send
   # NOTE: The "from" snapshot must exist in the source's snapshot list for incremental send
   #       If it doesn't exist in source, we need to find a common snapshot or do full send
   my %from_for;
   my %to_for;
   foreach my $fs (keys %snaps_by_fs) {
      my $arr = $snaps_by_fs{$fs};              # Get all snapshots for this filesystem
      next unless @$arr;                        # Skip if no snapshots
      $to_for{$fs} = $arr->[-1];                # Last element = newest snapshot to send
      
      # Check if the target's last status snapshot exists in the source list
      # If it does, we can do incremental send from that point
      # If it doesn't, the target may have a snapshot the source doesn't have anymore
      my $target_last = $last_status_for{$fs};
      if (defined $target_last && grep { $_ eq $target_last } @$arr) {
         $from_for{$fs} = $target_last;         # Use target's last snapshot as "from"
      } else {
         # Target's snapshot doesn't exist in source list - need full send
         $from_for{$fs} = undef;
      }
   }

   if ($verboseLoggingLevel >= 4) {
      logMsg( "from/to mapping:" );
      for my $fs (sort keys %to_for) {
         my $from = $from_for{$fs} // '(none - full send)';
         my $send_type = defined $from_for{$fs} ? 'incremental' : 'full';
         logMsg( "  $fs: from=$from to=$to_for{$fs} [$send_type]" );
      }
   }

   # Optimization check: Can we do a single recursive send?
   # Recursive sends are more efficient when replicating entire filesystem hierarchies
   # Condition: all filesystems must be sending to the same-named snapshot
   # Example: If pool/data@daily-2025-12-17 and pool/data/child@daily-2025-12-17 exist,
   #          we can do "zfs send -R pool/data@daily-2025-12-17" instead of two separate sends
   my %to_names = map { $_ => 1 } values %to_for;  # Get unique "to" snapshot names
   my $single_to_name = (keys %to_names == 1) ? (keys %to_names)[0] : undef;
   
   logMsg( "single_to_name=" . ($single_to_name // '(none - varied snapshots)'), 1 ) if $verboseLoggingLevel >= 4;

   if ($single_to_name) {
      # All filesystems are targeting the same snapshot name
      # Now check if we can use incremental recursive send or need full send
      my @from_values = map { $from_for{$_} } sort keys %from_for;
      my $any_from_missing = grep { !defined $_ } @from_values;  # Any filesystem not yet replicated?
      my %from_names = map { $_ => 1 } grep { defined $_ } @from_values;  # Unique "from" names
      my $single_from_name = (keys %from_names == 1) ? (keys %from_names)[0] : undef;

      logMsg( "single_from_name=" . ($single_from_name // '(none)') . ", any_from_missing=$any_from_missing", 1 ) if $verboseLoggingLevel >= 4;

      if ($any_from_missing) {
         # At least one filesystem has never been replicated (from=undef)
         # Check if the ROOT filesystem has been replicated - if not, must do full recursive send
         # If only children are missing, we can still do per-filesystem sends with incrementals where possible
         if (!defined $from_for{$root_fs}) {
            # Root filesystem has never been replicated - must do full recursive send
            # Command: zfs send -R pool/dataset@snapshot
            logMsg( "generating full recursive send (root filesystem has no prior snapshot)", 1 ) if $verboseLoggingLevel >= 4;
            $commands{$root_fs} = sprintf('zfs send -R %s%s@%s', $sourceParent, $root_fs, $single_to_name);
         } else {
            # Root has been replicated, but some children haven't - do per-filesystem sends
            # This allows incremental sends for filesystems that have prior snapshots
            logMsg( "root replicated but some children missing - using per-filesystem sends", 1 ) if $verboseLoggingLevel >= 4;
            foreach my $fs (sort keys %to_for) {
               my $to  = $to_for{$fs};
               my $from = $from_for{$fs};
               if ($from) {
                  # Incremental send for this filesystem
                  $commands{$fs} = sprintf('zfs send -I %s%s@%s %s%s@%s', $sourceParent, $fs, $from, $sourceParent, $fs, $to)
                     unless $from eq $to;
               } else {
                  # Full send for this filesystem (never replicated before)
                  logMsg( "$fs - full send (no prior snapshot)", 1 ) if $verboseLoggingLevel >= 4;
                  $commands{$fs} = sprintf('zfs send %s%s@%s', $sourceParent, $fs, $to);
               }
            }
         }
      }
      elsif ($single_from_name) {
         # All filesystems have been replicated AND they all have the same "from" snapshot
         # Perfect case for incremental recursive send
         # Command: zfs send -R -I pool/dataset@old pool/dataset@new
         if ($single_from_name eq $single_to_name) {
            # Source and target are already identical - nothing to send
            logMsg( "from and to snapshots are identical ($single_from_name) - no send needed", 1 ) if $verboseLoggingLevel >= 4;
         } else {
            logMsg( "generating incremental recursive send from $single_from_name to $single_to_name", 1 ) if $verboseLoggingLevel >= 4;
            $commands{$root_fs} = sprintf('zfs send -R -I %s%s@%s %s%s@%s',
                           $sourceParent, $root_fs, $single_from_name, $sourceParent, $root_fs, $single_to_name);
         }
      }
      else {
         # Filesystems have different "from" snapshots - can't use single recursive send
         # Fall back to individual per-filesystem sends
         logMsg( "from snapshots differ across children - using per-filesystem sends", 1 ) if $verboseLoggingLevel >= 4;
         foreach my $fs (sort keys %to_for) {
            my $to  = $to_for{$fs};
            my $from = $from_for{$fs};
            if ($from) {
               # Incremental send: send all intermediate snapshots from "from" to "to"
               # Skip if from and to are identical (already up to date)
               $commands{$fs} = sprintf('zfs send -I %s%s@%s %s%s@%s', $sourceParent, $fs, $from, $sourceParent, $fs, $to)
                  unless $from eq $to;
            } else {
               # Full send: no prior snapshot on target, send everything
               $commands{$fs} = sprintf('zfs send %s%s@%s', $sourceParent, $fs, $to);
            }
         }
      }

      # Update the status array with the new target snapshots
      # This will be written to the status file for tracking what's been replicated
      # Format: targetParent/filesystem@snapshot
      foreach my $fs (keys %to_for) {
         push @$newStatusRef, sprintf('%s%s@%s', $targetParent, $fs, $to_for{$fs});
      }
      logMsg( "added " . scalar(keys %to_for) . " entries to new status", 1 ) if $verboseLoggingLevel >= 4;
   } else {
      # Filesystems have different "to" snapshot names - can't use recursive send
      # Must send each filesystem individually
      # This handles cases like:
      #   - Parent: pool/data@daily-2025-12-17
      #   - Child:  pool/data/child@hourly-2025-12-17-14
      # Each filesystem can still do incremental sends to its own target snapshot
      logMsg( "varied 'to' snapshots - using per-filesystem sends (each may be incremental)", 1 ) if $verboseLoggingLevel >= 4;
      foreach my $fs (sort keys %to_for) {
         my $to  = $to_for{$fs};
         my $from = $from_for{$fs};
         if ($from) {
            # Incremental send for this filesystem to its specific target snapshot
            # Command: zfs send -I pool/fs@old pool/fs@new
            # Note: "old" and "new" are specific to this filesystem, not necessarily matching parent
            logMsg( "$fs - incremental send from $from to $to", 1 ) if $verboseLoggingLevel >= 4;
            $commands{$fs} = sprintf('zfs send -I %s%s@%s %s%s@%s', $sourceParent, $fs, $from, $sourceParent, $fs, $to);
         } else {
            # Full send for this filesystem (never replicated before, or target snap not in source)
            # Command: zfs send pool/fs@snapshot
            logMsg( "$fs - full send to $to (no common snapshot)", 1 ) if $verboseLoggingLevel >= 4;
            $commands{$fs} = sprintf('zfs send %s%s@%s', $sourceParent, $fs, $to);
         }
         # Add to status tracking
         push @$newStatusRef, sprintf('%s%s@%s', $targetParent, $fs, $to);
      }
      logMsg( "added " . scalar(keys %to_for) . " entries to new status", 1 ) if $verboseLoggingLevel >= 4;
   }

   logMsg( "generated " . scalar(keys %commands) . " commands", 1 ) if $verboseLoggingLevel >= 4;

   # Return hash reference of commands: { "filesystem" => "zfs send command" }
   # Caller will typically pipe these to "zfs receive" on target
   return \%commands;
}

# Send report via email and/or copy to target drive.
# $reportConfig is a hashref with optional keys:
#   email - email address to send report to
#   targetDrive - hashref with keys:
#       label - GPT or msdosfs label of the target drive
#       mountPoint - optional mount point to use (if not provided, /mnt/label is used)
# $subject is the email subject
# $message is the message to include in the email body
# $logFile is the path to the log file to include in the report
sub sendReport {
   my ( $reportConfig, $message, $logFile ) = @_;
   return unless defined $reportConfig;
   $logFile //= $reportConfig->{logFile};
   logMsg( "Beginning sendReport" ) if $verboseLoggingLevel >= 0;
   # if targetDrive defined and there is a valid label for it, try to mount it and write the report there
   if ( defined $reportConfig->{targetDrive} && defined $reportConfig->{targetDrive}->{label} && $reportConfig->{targetDrive}->{label} ) {
      logMsg( "Saving report to disk with label $reportConfig->{targetDrive}->{label}", 1 ) if $verboseLoggingLevel >= 2;
      if ( $reportConfig->{targetDrive}->{mountPath} = mountDriveByLabel( $reportConfig->{targetDrive} ) ) {
         copyReportToDrive( $logFile, $reportConfig->{targetDrive}->{mountPath} );
         unmountDriveByLabel( $reportConfig->{targetDrive} );
      } else {
         logMsg( "Warning: could not mount report target drive with label '$reportConfig->{targetDrive}->{label}'" ) if $verboseLoggingLevel >= 1;
      }
   }
   # if they have set an e-mail address, try to e-mail the report
   if ( defined $reportConfig->{email} && $reportConfig->{email} ne '' ) {
      logMsg( "Sending report via e-mail to $reportConfig->{email}" ) if $verboseLoggingLevel >= 1;
      $reportConfig->{subject} //= 'Replication Report from ' . `hostname`;
      sendEmailReport( $reportConfig->{email}, $reportConfig->{subject}, $message, $logFile );
   }
}

## Copy the report log file to a mounted target drive with timestamped filename.
##
## Arguments:
##   $logFile    - path to the log file to copy (must exist)
##   $mountPoint - mount point of the target drive (must be a directory)
##
## Behavior:
##   - Prepends timestamp (YYYY-MM-DD_HH-MM-SS) to filename before copying
##   - Example: sneakernet.log becomes 2026-01-17_14-30-45_sneakernet.log
##   - Prevents overwriting previous reports on the same drive
##   - Copies the log file into the root of $mountPoint using File::Copy::copy
##   - Logs success/failure via logMsg
sub copyReportToDrive {
   my ( $logFile, $mountPoint ) = @_;
   return unless defined $logFile && -e $logFile;
   return unless defined $mountPoint && -d $mountPoint;

   # Prepend timestamp to filename
   my $timestamp = POSIX::strftime("%Y-%m-%d_%H-%M-%S", localtime());
   my $baseFilename = ( split( /\//, $logFile ) )[-1];
   my $targetFile = "$mountPoint/${timestamp}_${baseFilename}";
   logMsg( "Copying report log file $logFile to drive at $mountPoint as ${timestamp}_${baseFilename}", 1 ) if $verboseLoggingLevel >= 2;
   use File::Copy;
   unless ( copy( $logFile, $targetFile ) ) {
      logMsg( "Could not copy report log file to target drive: $!" ) if $verboseLoggingLevel >= 0;
   }
}

## Send an email report with an attached log body.
##
## Arguments:
##   $to      - recipient email address (string)
##   $subject - subject line (string)
##   $message - optional message body (string)
##   $logFile - optional path to log file whose contents will be appended to the email body
##
## Behavior:
##   - Opens /usr/sbin/sendmail -t and writes a simple plain-text email including the
##     supplied message and the contents of $logFile (if present).
##   - Logs failures to open sendmail or read the log file.
sub sendEmailReport {
   my ( $to, $subject, $message, $logFile ) = @_;
   return unless defined $to && $to ne '';
   $subject //= 'Sneakernet Replication Report from ' . `hostname`;
   $message //= '';
   $logFile //= '';

   logMsg( "Sending email report to $to with subject '$subject'", 1 ) if $verboseLoggingLevel >= 2;
   open my $mailfh, '|-', '/usr/sbin/sendmail -t' or do {
      logMsg( "Could not open sendmail: $!" ) if $verboseLoggingLevel >= 0;
      return;
   };
   print $mailfh "To: $to\n";
   print $mailfh "Subject: $subject\n";
   print $mailfh "MIME-Version: 1.0\n";
   print $mailfh "Content-Type: text/plain; charset=\"utf-8\"\n";
   print $mailfh "\n"; # end of headers
   
   print $mailfh "$message\n";
   print $mailfh "\nLog contents:\n\n";
   if ( -e $logFile && open my $logfh, '<', $logFile ) {
      while ( my $line = <$logfh> ) {
         print $mailfh $line;
      }
      close $logfh;
   } else {
      logMsg( "Could not open log file [$logFile] for reading: $!" ) if $verboseLoggingLevel >= 0;
   };

   close $mailfh;
}  

## Return list of regular files in a directory (non-recursive).
##
## Arguments:
##   $dirname - directory to scan
##   $filter  - OPTIONAL regex pattern to filter filenames (applied after full paths are built)
##   $invertFilter - OPTIONAL boolean to invert the filter logic (if true, files NOT matching the pattern are returned)
## Behavior:
##   - Opens the specified directory and reads all entries.
##   - Filters to include only regular files (no directories, symlinks, etc).
##   - If a filter pattern is provided, further filters the list to include only files
##     matching the pattern (or not matching if $invertFilter is true).
## Returns: ARRAYREF of full-path filenames on success, 0 on error (matching prior behavior).
sub getDirectoryList {
   my ( $dirname, $filter, $invertFilter ) = @_;
   $filter //= '';
   $invertFilter //= 0;
   opendir( my $dh, $dirname ) || return 0;
   # get all file names, but leave directories alone
   my @files = map{ $dirname . "/$_" } grep { -f "$dirname/$_" } readdir($dh);
   closedir $dh;
   if ( $filter ne '' ) {
      if ( $invertFilter ) {
         @files = grep { !/$filter/ } @files;
      } else {
         @files = grep { /$filter/ } @files;
      }
   }
   return \@files;
}

## Remove all regular files from the specified directory (non-recursive).
##
## Arguments:
##   $dirname - directory to clean
##
## Behavior:
##   - Calls getDirectoryList to obtain files and unlinks each file. Directories are left untouched.
##   - Logs the cleanup operation via logMsg.
##
## Returns: 1 on completion. Note: individual unlink failures are currently reported via warn.
sub cleanDirectory {
   my $dirname = shift;
   logMsg( "Cleaning up $dirname of all files", 1 ) if $verboseLoggingLevel >= 2;
   return 1 unless -d $dirname;
   my $files = getDirectoryList( $dirname );
   # clean up a directory
   foreach my $file (@$files) {
      unlink $file or warn "Could not unlink $file: #!\n";
   }
   return 1;
}

## Handle a fatal error: log, optionally run a cleanup routine, then die.
##
## Arguments:
##   $message        - string message describing the fatal condition
##   $config         - OPTIONAL configuration HASHREF (passed to cleanupRoutine)
##   $cleanupRoutine - OPTIONAL CODE ref to run prior to dying; will be called as
##                     $cleanupRoutine->($config, $message)
##
## Behavior:
##   - Logs the fatal message via logMsg, runs the cleanup code if provided (errors in the cleanup
##     are logged), then terminates the process via die.
sub fatalError {
   my ( $message, $config, $cleanupRoutine ) = @_;
   logMsg( "FATAL ERROR: $message" ) if $verboseLoggingLevel >= 0;
   if ( defined $cleanupRoutine && ref $cleanupRoutine eq 'CODE' ) {
      logMsg( "Running cleanup routine before fatal error", 1 ) if $verboseLoggingLevel >= 2;
      eval {
         $cleanupRoutine->( $config, $message );
         1;
      } or do {
         logMsg( "Cleanup routine failed: $@" ) if $verboseLoggingLevel >= 0;
      };
   }
   die;
}

## Parse a date/time string from a snapshot name and return epoch time.
##
## Arguments:
##   $snapname - snapshot name (the part after @) to parse
##
## Behavior:
##   - Extracts date/time string matching pattern YYYY-MM-DD or YYYY-MM-DD_HH:MM:SS variants
##   - Accepts multiple separator formats (T, _, space for date/time; : or . for time components)
##   - Tries multiple date/time format patterns until one parses successfully
##   - Returns undef if no date found or parsing fails
##
## Returns:
##   Epoch time (integer) on success, undef on failure
##
## Supported formats:
##   YYYY-MM-DD
##   YYYY-MM-DD HH:MM:SS
##   YYYY-MM-DDTHH:MM:SS
##   YYYY-MM-DD_HH:MM:SS
##   YYYY-MM-DD_HH.MM.SS
##   YYYY-MM-DD_HH.MM
##   (with optional prefix/suffix text around the date)
sub parseSnapshotDateTime {
   my ($snapname) = @_;
   
   return undef unless defined $snapname;
   
   # Try to extract date/time from snapshot name
   # Pattern matches: YYYY-MM-DD or YYYY-MM-DD_HH:MM:SS or YYYY-MM-DD_HH.MM.SS variants
   # Allow optional prefix before the date and optional suffix after
   my $date_str;
   if ($snapname =~ /(\d{4}-\d{2}-\d{2}(?:[T _]\d{2}[:\.]\d{2}(?:[:\.]\d{2})?)?)/) {
      $date_str = $1;
   } else {
      # No date found in snapshot name
      return undef;
   }
   
   # Parse the date/time string to epoch
   # Support multiple formats commonly used in ZFS snapshot naming
   my @fmts = (
      '%Y-%m-%d %H:%M:%S',    # Full date-time with seconds
      '%Y-%m-%dT%H:%M:%S',    # ISO 8601 format
      '%Y-%m-%d %H:%M',       # Date-time without seconds
      '%Y-%m-%d_%H.%M.%S',    # Dot-separated time with seconds
      '%Y-%m-%d_%H.%M',       # Dot-separated time without seconds
      '%Y-%m-%d',             # Date only
   );
   
   # Normalize separators for consistent parsing
   # Different snapshot naming conventions use different separators, so we standardize them
   my $parse_date = $date_str;
   $parse_date =~ s/T/ /;     # ISO 8601 T separator -> space
   $parse_date =~ s/_/ /;     # Underscore date-time separator -> space
   
   # Convert dot-separated time to colon-separated (HH.MM.SS -> HH:MM:SS)
   # The pattern checks for HH.MM or HH.MM.SS format (seconds optional)
   # This ensures we properly handle both HH.MM.SS and HH.MM formats
   $parse_date =~ s/\./:/g if $parse_date =~ /\d{2}\.\d{2}(\.\d{2})?/;
   
   my $epoch;
   my $parsed = 0;
   foreach my $fmt (@fmts) {
      eval {
         require Time::Piece;
         my $tp = Time::Piece->strptime($parse_date, $fmt);
         $epoch = $tp->epoch;
         $parsed = 1;
         1;
      };
      last if $parsed;
   }
   
   return $parsed ? $epoch : undef;
}

## Find the latest snapshot for each filesystem in a dataset hierarchy based on date/time.
##
## Arguments:
##   $snapshotsRef - ARRAYREF of snapshot names (format: "pool/fs@snapname" or just snapshot lines)
##
## Behavior:
##   - Parses each snapshot name to extract filesystem and snapshot name
##   - For each snapshot, attempts to extract a date/time stamp from the snapshot name
##   - Accepts formats: YYYY-MM-DD, YYYY-MM-DD_HH:MM:SS, YYYY-MM-DD_HH.MM.SS, etc.
##   - Returns the snapshot with the latest date/time for each filesystem
##
## Returns:
##   ARRAYREF of snapshot names (full "pool/fs@snapname" format), one per filesystem
##   containing the latest snapshot based on parsed date/time
##
## Example:
##   Input: ["pool/data@2025-01-01", "pool/data@2025-12-01", "pool/data/child@2025-06-01"]
##   Output: ["pool/data@2025-12-01", "pool/data/child@2025-06-01"]
sub getLatestSnapshots {
   my ($snapshotsRef) = @_;
   
   $snapshotsRef ||= [];
   
   logMsg( "processing " . scalar(@$snapshotsRef) . " snapshots", 1 ) if $verboseLoggingLevel >= 4;
   
   # Parse snapshots and group by filesystem with their date/times
   my %fs_snaps;  # { "pool/fs" => [ { snap => "snapname", epoch => 12345, full => "pool/fs@snapname" }, ... ] }
   
   foreach my $line (@$snapshotsRef) {
      next unless defined $line && $line =~ /\S/;
      my ($tok) = split /\s+/, $line;  # Get first token
      next unless $tok && $tok =~ /@/;
      
      my ($fs, $snapname) = split /@/, $tok, 2;
      
      # Parse date/time from snapshot name using helper function
      my $epoch = parseSnapshotDateTime($snapname);
      
      unless (defined $epoch) {
         # No date found or parsing failed, skip this snapshot
         logMsg( "no date found or parse failed in snapshot: $tok", 1 ) if $verboseLoggingLevel >= 5;
         next;
      }
      
      # Store this snapshot with its parsed epoch time
      push @{ $fs_snaps{$fs} }, {
         snap => $snapname,
         epoch => $epoch,
         full => $tok,
      };
      
      logMsg( "$tok -> epoch $epoch", 1 ) if $verboseLoggingLevel >= 5;
   }
   
   # For each filesystem, find the snapshot with the latest epoch time
   my @latest_snapshots;
   foreach my $fs (sort keys %fs_snaps) {
      my @snaps = @{ $fs_snaps{$fs} };
      next unless @snaps;
      
      # Sort by epoch (descending) and take the first one
      my ($latest) = sort { $b->{epoch} <=> $a->{epoch} } @snaps;
      
      push @latest_snapshots, $latest->{full};
      logMsg( "$fs -> latest: $latest->{full} (epoch $latest->{epoch})", 1 ) if $verboseLoggingLevel >= 4;
   }
   
   logMsg( "returning " . scalar(@latest_snapshots) . " latest snapshots", 1 ) if $verboseLoggingLevel >= 4;
   
   return \@latest_snapshots;
}

## snapShotReport: Generate a report comparing original and current snapshot states
##
## Parameters:
##   $dataset - The root dataset name (string)
##   $originalSnapshotsRef - Arrayref of original latest snapshots (one per filesystem)
##   $currentSnapshotsRef - Arrayref of all current snapshots after modification
##
## Returns:
##   Arrayref of report lines (strings) showing changes per filesystem
##
## Report flags:
##   CONSISTENT - Parent and all children have identical snapshot changes
##   ADDED - New dataset/filesystem added
##   UNCHANGED - No change between original and current
##   (no flag) - Changed, showing original -> current and count
##
## Example output lines:
##   "pool/data: CONSISTENT - original: pool/data\@2025-12-18, current: pool/data\@2025-12-19 (1 snapshot added)"
##   "pool/data/child: ADDED - current: pool/data/child\@2025-12-19 (no original snapshot)"
##   "pool/backup: UNCHANGED - pool/backup\@2025-12-18"
sub snapShotReport {
   my ($dataset, $originalSnapshotsRef, $currentSnapshotsRef) = @_;
   
   $originalSnapshotsRef ||= [];
   $currentSnapshotsRef ||= [];
   
   logMsg( "generating report for dataset: $dataset", 1 ) if $verboseLoggingLevel >= 4;
   logMsg( "original snapshots: " . scalar(@$originalSnapshotsRef), 1 ) if $verboseLoggingLevel >= 5;
   logMsg( "current snapshots: " . scalar(@$currentSnapshotsRef), 1 ) if $verboseLoggingLevel >= 5;
   
   # Parse original snapshots - these should be latest snapshots per filesystem
   my %original;  # { "pool/data" => "pool/data\@2025-12-18", ... }
   foreach my $snap (@$originalSnapshotsRef) {
      next unless defined $snap && $snap =~ /\@/;
      my ($fs, $snapname) = split /\@/, $snap, 2;
      $original{$fs} = $snap;
   }
   
   # Parse current snapshots and find latest per filesystem
   my %current_latest;  # { "pool/data" => { snap => "2025-12-19", epoch => 123456, full => "pool/data\@2025-12-19" }, ... }
   my %current_all;     # { "pool/data" => ["snap1", "snap2", ...] }
   
   foreach my $snap (@$currentSnapshotsRef) {
      next unless defined $snap && $snap =~ /\@/;
      my ($tok) = split /\s+/, $snap;  # Get first token
      next unless $tok && $tok =~ /\@/;
      
      my ($fs, $snapname) = split /\@/, $tok, 2;
      
      # Track all snapshots for this filesystem
      push @{$current_all{$fs}}, $snapname;
      
      # Find latest using parseSnapshotDateTime
      my $epoch = parseSnapshotDateTime($snapname);
      
      if (defined $epoch) {
         if (!exists $current_latest{$fs} || $epoch > $current_latest{$fs}->{epoch}) {
            $current_latest{$fs} = {
               snap => $snapname,
               epoch => $epoch,
               full => $tok
            };
         }
      }
   }
   
   # Build filesystem hierarchy to detect parent/child relationships
   my @all_filesystems = sort keys %{{ map { $_ => 1 } (keys %original, keys %current_latest) }};
   
   logMsg( "found " . scalar(@all_filesystems) . " unique filesystems", 1 ) if $verboseLoggingLevel >= 5;
   
   # Generate report for each filesystem
   my @report;
   my %processed;  # Track which filesystems we've already reported
   
   foreach my $fs (sort @all_filesystems) {
      next if $processed{$fs};
      
      my $orig_snap = $original{$fs};
      my $curr_snap = $current_latest{$fs} ? $current_latest{$fs}->{full} : undef;
      
      # Check if this is a parent with children
      my @children = grep { /^\Q$fs\E\// && $_ ne $fs } @all_filesystems;
      
      if (@children) {
         # Check if all children have the same change pattern
         my $parent_changed = defined($orig_snap) && defined($curr_snap) && $orig_snap ne $curr_snap;
         my $parent_unchanged = defined($orig_snap) && defined($curr_snap) && $orig_snap eq $curr_snap;
         my $parent_added = !defined($orig_snap) && defined($curr_snap);
         
         my $consistent = 1;
         foreach my $child (@children) {
            my $child_orig = $original{$child};
            my $child_curr = $current_latest{$child} ? $current_latest{$child}->{full} : undef;
            
            my $child_changed = defined($child_orig) && defined($child_curr) && $child_orig ne $child_curr;
            my $child_unchanged = defined($child_orig) && defined($child_curr) && $child_orig eq $child_curr;
            my $child_added = !defined($child_orig) && defined($child_curr);
            
            # Check if child matches parent pattern
            if ($parent_changed && !$child_changed) {
               $consistent = 0;
               last;
            }
            if ($parent_unchanged && !$child_unchanged) {
               $consistent = 0;
               last;
            }
            if ($parent_added && !$child_added) {
               $consistent = 0;
               last;
            }
         }
         
         if ($consistent) {
            # Report parent as CONSISTENT and skip children
            my $line = formatReportLine($fs, $orig_snap, $curr_snap, $current_all{$fs}, "CONSISTENT");
            push @report, $line;
            $processed{$fs} = 1;
            foreach my $child (@children) {
               $processed{$child} = 1;
            }
            next;
         }
      }
      
      # Not consistent or no children - report individually
      my $line = formatReportLine($fs, $orig_snap, $curr_snap, $current_all{$fs}, undef);
      push @report, $line;
      $processed{$fs} = 1;
   }
   
   logMsg( "generated " . scalar(@report) . " report lines", 1 ) if $verboseLoggingLevel >= 4;
   
   return \@report;
}

## formatReportLine: Helper function to format a single report line
##
## Parameters:
##   $fs - Filesystem name
##   $orig_snap - Original snapshot (full name with dataset\@snapshot)
##   $curr_snap - Current latest snapshot (full name)
##   $all_snaps_ref - Arrayref of all current snapshot names for this filesystem
##   $flag - Optional flag (CONSISTENT, ADDED, UNCHANGED)
##
## Returns:
##   Formatted report line (string)
sub formatReportLine {
   my ($fs, $orig_snap, $curr_snap, $all_snaps_ref, $flag) = @_;
   
   $all_snaps_ref ||= [];
   
   # Determine status
   my $status;
   my $count = 0;
   
   if (!defined $orig_snap && defined $curr_snap) {
      $status = $flag || "ADDED";
      return "$fs: $status - current: $curr_snap (no original snapshot)";
   }
   elsif (!defined $curr_snap && defined $orig_snap) {
      # This shouldn't happen in normal replication but handle it
      return "$fs: REMOVED - original: $orig_snap (no current snapshot)";
   }
   elsif (!defined $orig_snap && !defined $curr_snap) {
      # No snapshots at all
      return "$fs: NO SNAPSHOTS";
   }
   elsif ($orig_snap eq $curr_snap) {
      $status = $flag || "UNCHANGED";
      return "$fs: $status - $orig_snap";
   }
   else {
      # Changed - count snapshots between original and current
      my ($orig_fs, $orig_snapname) = split /\@/, $orig_snap, 2;
      my ($curr_fs, $curr_snapname) = split /\@/, $curr_snap, 2;
      
      my $orig_epoch = parseSnapshotDateTime($orig_snapname) || 0;
      my $curr_epoch = parseSnapshotDateTime($curr_snapname) || 0;
      
      # Count snapshots between original and current (inclusive of current, exclusive of original)
      foreach my $snap (@$all_snaps_ref) {
         my $snap_epoch = parseSnapshotDateTime($snap);
         if (defined $snap_epoch && $snap_epoch > $orig_epoch && $snap_epoch <= $curr_epoch) {
            $count++;
         }
      }
      
      $status = $flag || "";
      my $status_str = $status ? "$status - " : "";
      my $plural = $count == 1 ? "snapshot" : "snapshots";
      return "$fs: ${status_str}original: $orig_snap, current: $curr_snap ($count $plural added)";
   }
}

## datasetExists: Check if a ZFS dataset exists on the system.
##
## This function verifies the existence of a ZFS dataset (filesystem or volume) by querying
## ZFS and filtering the output. Uses grep to suppress error messages that would appear on
## the console if `zfs list` is run directly on a non-existent dataset.
##
## Arguments:
##   $dataset - The full dataset name to check (string, e.g., "pool/data" or "backup/files_share")
##
## Behavior:
##   - Executes `zfs list -H -o name` to get all datasets
##   - Pipes through grep with anchored pattern (^$dataset$) to match exact name
##   - The grep suppresses stderr from zfs list, preventing console clutter
##   - Returns the dataset name if found, empty string if not found
##
## Returns:
##   - String containing the dataset name if it exists (truthy in boolean context)
##   - Empty string if dataset does not exist (falsy in boolean context)
##
## Example usage:
##   if (datasetExists('backup/mydata')) {
##      logMsg("Dataset exists, proceeding...");
##   } else {
##      runCmd("zfs create backup/mydata");
##   }
sub datasetExists {
   my $dataset = shift;
   
   # Query ZFS and filter with grep to avoid stderr messages on console
   # If the dataset exists, zfs list returns the name; if not, it would normally
   # print an error to stderr. By piping through grep, we suppress the error
   # and get a clean truthy/falsy return value.
   my $return = runCmd( "zfs list -H -o name | grep '^$dataset\$'" );
   
   return $return;
}


1;
