Running a flow automatically#

It is frequently desirable to run a flow when new data becomes available. This document provides examples for how to accomplish this using watchdog to monitor for filesystem events and using the Globus Automate CLI to run already-defined flows.

Shell scripting#

watchdog provides a CLI tool named watchmedo. If you’re comfortable with shell scripting then you can use the following command to run a script named runner.sh.

watchmedo.sh [download]#
watchmedo shell-command \
    --command 'bash runner.sh "${watch_event_type}" "${watch_src_path}"' \
    --recursive \
    .

The watchmedo command currently has no way to filter filesystem events. On Linux, runner.sh will be run when:

  • a file is created (but has not had data written yet)

  • data is written to the file (but the file has not been closed yet)

  • the file is closed

This will likely result in your flow running far more often than expected. To avoid this, runner.sh must filter the incoming filesystem events. Here is an example script that will run a flow with custom input when the filesystem event type is "closed":

runner.sh [download]#
set -eu

# Only run a flow when a new file is created.
if [ "$1" != "closed" ]; then
    exit
fi

FLOW_ID="your-flow-id-here"
FLOW_INPUT=$(cat << EOF
{
    "event": "$1",
    "filename": "$2"
}
EOF
)

globus-automate flow run \
    --label "File change: $1" \
    --flow-input "${FLOW_INPUT}" \
    "${FLOW_ID}"

Note

Filesystem events are less granular on Windows platforms. Notably, there is no "closed" event type to signal all data have been written to a file. There is only a "modified" event type, which may fire multiple times if large files are written and flushed to disk in multiple chunks.

Users on Windows may want to use the Python script below.

Python scripting#

If you know that files will be created or modified in large batches, you may need to write a script to monitor for filesystem events and wait some amount of time for filesystem events to taper off. One way to do this is using the watchdog package.

The script below will monitor for filesystem events of all types. It will only run a flow 60 seconds after the most recent filesystem event is encountered.

runner.py [download]#
import logging
import os
import pathlib
import queue
import sys

# The flow to run.
FLOW_ID = "your-flow-id-here"

# The flow will be run X seconds after the most recent filesystem event is received.
# If no filesystem events are ever received, the flow will not be run.
COOL_OFF_TIME_S = 60


logging.basicConfig(
    level=logging.WARNING,  # Eliminate INFO messages from the Globus SDK.
    format="%(asctime)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__name__)

try:
    from globus_automate_client import create_flows_client
except ImportError:
    log.error(
        "The globus_automate_client package is not installed."
        " (Do you need to activate a virtual environment?)"
    )
    sys.exit(1)

try:
    import watchdog
    import watchdog.events
    import watchdog.observers
except ImportError:
    log.error(
        "The watchdog package is not installed."
        " (Do you need to activate a virtual environment?)"
    )
    sys.exit(1)


class Handler(watchdog.events.FileSystemEventHandler):
    def __init__(self, events: queue.Queue):
        self.events = events

    def dispatch(self, event):
        """Put all filesystem events in a queue."""

        self.events.put(event)


def main():
    try:
        path = pathlib.Path(sys.argv[1]).absolute()
    except IndexError:
        path = pathlib.Path(os.getcwd()).absolute()
    log.warning(f"Monitoring {path}")
    log.warning("Press CTRL-C to exit (on Windows, press CTRL-BREAK)")

    event_queue = queue.Queue(maxsize=-1)
    handler = Handler(event_queue)
    observer = watchdog.observers.Observer()
    observer.schedule(handler, str(path), recursive=True)
    observer.start()

    flows_client = create_flows_client()

    try:
        timeout = None
        files = set()
        while True:
            try:
                event = event_queue.get(block=True, timeout=timeout)
            except queue.Empty:
                # .get() timed out.
                # It's now been COOL_OFF_TIME_S seconds since the last filesystem event.
                # Reset the timeout for the next batch of files and run the flow.
                timeout = None
                log.warning(f"Running the flow ({len(files)} paths were modified)")
                flows_client.run_flow(
                    flow_id=FLOW_ID,
                    flow_scope=None,
                    flow_input={
                        "count": len(files),
                    },
                    label=f"[AUTO] File system changes detected ({len(files)} paths)",
                )
                files = set()
            else:
                # .get() returned a filesystem event.
                # Make sure the next .get() call times out after COOL_OFF_TIME_S.
                timeout = COOL_OFF_TIME_S
                files.add(event.src_path)
                event_queue.task_done()
    except KeyboardInterrupt:
        pass
    finally:
        observer.stop()
        observer.join()


if __name__ == "__main__":
    main()