PyAsyncScheduler (version 0.0.1)
index
pyasyncscheduler.py

An asynchronous task scheduler, with cron syntax, intervals, limits,
dynamic configuration, and optional vault integration.

This is an asynchronous scheduler designed to execute both synchronous and asynchronous
tasks with flexible timing and configuration options. It supports cron expressions, sleep durations, fixed
intervals, start/end time windows, and occurrence limits.

Tasks can be defined using simple wordlists, added dynamically at runtime, and configured individually with
execution constraints. While most tasks are expected to be asynchronous, synchronous
functions are fully supported.

Optional features include integration with a vault for secure configuration or secrets management. The scheduler
is built with extensibility and clarity in mind, suitable for running background jobs, periodic checks, lightweight
automation, or orchestrating custom workflows in asynchronous Python applications.

 
Classes
       
builtins.object
Argument
CronSchedule
TaskLimiter
TaskScheduler
builtins.tuple(builtins.object)
Arguments

 
class Argument(builtins.object)
    Argument(names: List[str], value: Any = None) -> None
 
Argument(names: List[str], value: Any = None)
 
  Methods defined here:
__eq__(self, other)
Return self==value.
__init__(self, names: List[str], value: Any = None) -> None
Initialize self.  See help(type(self)) for accurate signature.
__repr__(self)
Return repr(self).

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

Data and other attributes defined here:
__annotations__ = {'names': typing.List[str], 'value': typing.Any}
__dataclass_fields__ = {'names': Field(name='names',type=typing.List[str],default...appingproxy({}),kw_only=False,_field_type=_FIELD), 'value': Field(name='value',type=typing.Any,default=None,...appingproxy({}),kw_only=False,_field_type=_FIELD)}
__dataclass_params__ = _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False)
__hash__ = None
__match_args__ = ('names', 'value')
value = None

 
class Arguments(builtins.tuple)
    Arguments(vault, tasks_files)
 
Arguments(vault, tasks_files)
 
 
Method resolution order:
Arguments
builtins.tuple
builtins.object

Methods defined here:
__getnewargs__(self)
Return self as a plain tuple.  Used by copy and pickle.
__repr__(self)
Return a nicely formatted representation string
_asdict(self)
Return a new dict which maps field names to their values.
_replace(self, /, **kwds)
Return a new Arguments object replacing specified fields with new values

Class methods defined here:
_make(iterable) from builtins.type
Make a new Arguments object from a sequence or iterable

Static methods defined here:
__new__(_cls, vault, tasks_files)
Create new instance of Arguments(vault, tasks_files)

Data descriptors defined here:
vault
Alias for field number 0
tasks_files
Alias for field number 1

Data and other attributes defined here:
__match_args__ = ('vault', 'tasks_files')
_field_defaults = {}
_fields = ('vault', 'tasks_files')

Methods inherited from builtins.tuple:
__add__(self, value, /)
Return self+value.
__contains__(self, key, /)
Return key in self.
__eq__(self, value, /)
Return self==value.
__ge__(self, value, /)
Return self>=value.
__getattribute__(self, name, /)
Return getattr(self, name).
__getitem__(self, key, /)
Return self[key].
__gt__(self, value, /)
Return self>value.
__hash__(self, /)
Return hash(self).
__iter__(self, /)
Implement iter(self).
__le__(self, value, /)
Return self<=value.
__len__(self, /)
Return len(self).
__lt__(self, value, /)
Return self<value.
__mul__(self, value, /)
Return self*value.
__ne__(self, value, /)
Return self!=value.
__rmul__(self, value, /)
Return value*self.
count(self, value, /)
Return number of occurrences of value.
index(self, value, start=0, stop=9223372036854775807, /)
Return first index of value.
 
Raises ValueError if the value is not present.

Class methods inherited from builtins.tuple:
__class_getitem__(...) from builtins.type
See PEP 585

 
class CronSchedule(builtins.object)
    CronSchedule(cron_expression: str, use_sunday_zero: bool = False)
 
Lightweight 5-field cron expression evaluator (no external libraries).
 
Supports lists ",", ranges "-", steps "/", and wildcard "*".
 
Fields: minute hour day month weekday (0=Monday per datetime.weekday()).
Note: Many crons use 0=Sunday, 6=Saturday; here we map 0-6 to Python's
Monday(0)..Sunday(6). If you need Sunday-based 0..6, set
``use_sunday_zero=True``.
 
  Methods defined here:
__init__(self, cron_expression: str, use_sunday_zero: bool = False)
Initialize self.  See help(type(self)) for accurate signature.
matches(self, match_datetime: datetime.datetime) -> bool
Return True if ``match_datetime`` matches the cron expression at minute-level granularity.

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
class TaskLimiter(builtins.object)
    TaskLimiter(limit: int, period_seconds: int)
 
Per-instance execution limiter using a sliding time window.
 
This limiter *delays* executions instead of dropping them so that
"every instance runs" as required. Use :meth:`await_permit` to
asynchronously wait until the next execution is allowed.
 
Args:
    limit: Maximum number of executions allowed within ``period_seconds``.
    period_seconds: Sliding window size, in seconds.
 
  Methods defined here:
__init__(self, limit: int, period_seconds: int)
Initialize self.  See help(type(self)) for accurate signature.
async await_permit(self) -> None
This method wait until an execution slot is available, then record the run.

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
class TaskScheduler(builtins.object)
    TaskScheduler(start_callable: Callable[[str], Union[Awaitable[Any], threading.Thread, NoneType]], process_result: Optional[Callable[[Any, Dict[str, Any]], NoneType]] = None, worker_count: int = 4, external_coroutines: List[Coroutine] = [], vault: ~PasswordVault = None)
 
Secure, dependency-free task scheduler supporting cron/interval/sleep,
per-time-window limits, async workers, optional threading, runtime task
registration, and CSV-based wordlists -> Template expansion.
 
Configuration schema per task (JSON):
 
{
  "url": "https://${edr_user}@${edr_password}:api/foo?u=${user}&p=${pass}",
  "csv_inputs": ["users.csv", "passes.csv"],
  "credentials": {"edr": {"category": "EDR", "role": "events_reader"}},
  "cron": "*/5 * * * *",
  "limit": {"max_executions": 100, "per_seconds": 60},
  "instance_spacing": 1
}
 
{
  "url": "https://api/foo?u=${user}&p=${pass}",
  "csv_inputs": ["users.csv", "passes.csv"],
  "start_time": "2025-06-22T00:00:00",
  "end_time":   "2025-06-22T06:00:00",
  "occurrences": 12
}
 
{
  "url": "https://api/foo?u=${user}&p=${pass}",
  "csv_inputs": ["users.csv", "passes.csv"],
  "sleep": 3600
}
 
Notes:
- Each *instance* is one rendered template string from CSV rows.
- If a limit is set, the limiter *delays* launches to honor the window.
- If a run cycle takes longer than the nominal schedule period, the next
  cycle will start immediately after the current one completes (i.e., run late).
- For threading: if the provided ``start_callable`` returns a ``threading.Thread``
  (not started), the scheduler will ``start()`` it and later ``join()`` it.
  To pass back a result from threads, expose a ``.result`` attribute on the
  thread object (optional). Async callables should return an awaitable.
 
  Methods defined here:
__init__(self, start_callable: Callable[[str], Union[Awaitable[Any], threading.Thread, NoneType]], process_result: Optional[Callable[[Any, Dict[str, Any]], NoneType]] = None, worker_count: int = 4, external_coroutines: List[Coroutine] = [], vault: ~PasswordVault = None)
Initialize self.  See help(type(self)) for accurate signature.
add_task(self, task_config: Dict[str, Union[str, int, List[str]]], id_: str = None, parent_id: str = None) -> Dict[str, Union[str, int, NoneType]]
This method registers a new task at runtime.
get_task_result(self, id_: str, parent_id: str) -> Dict[str, Union[str, int, NoneType]]
This method opens and sets password.
async run(self) -> None
Start workers and the supervisor. This never returns.

Static methods defined here:
check_and_build_task(task_config: Dict[str, Union[str, int, List[str]]], task: Dict[str, ~Json]) -> None
This method checks the task format and attributes.
 
Raise ValueError or TypeError if a value is invalid.

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
Functions
       
default_handle_result(data: Dict[str, Dict[str, Union[str, int, NoneType]]], response: Dict[str, Union[str, int, NoneType]], task: Dict[str, ~Json]) -> None
The default handler to add a task result to results storage.
exit(status=None, /)
Exit the interpreter by raising SystemExit(status).
 
If the status is omitted or None, it defaults to zero (i.e., success).
If the status is an integer, it will be used as the system exit status.
If it is another kind of object, it will be printed and the system
exit status will be one (i.e., failure).
expand_from_csvs(template: str, csv_paths: Sequence[str], **additional_data) -> List[Tuple[str, Dict[str, str]]]
This function builds the Cartesian product of multiple CSVs and render a Template per combo.
 
For each combination of one row from each CSV, merge the row dicts (later
CSVs overwrite keys on collision), then render the ``template`` with
``string.Template.safe_substitute``.
 
Args:
    template: A string.Template with placeholders like ``${user}``.
    csv_paths: A list of CSV file paths.
 
Returns:
    A list of tuples: (rendered_config_string, merged_mapping_dict).
get_event_loop()
Return an asyncio event loop.
 
When called from a coroutine or a callback (e.g. scheduled with
call_soon or similar API), this function will always return the
running event loop.
 
If there is no running event loop set, the function will return
the result of `get_event_loop_policy().get_event_loop()` call.
load_csv_rows(csv_path: str) -> List[Dict[str, str]]
This function loads a CSV into a list of row dictionaries using the header row as keys.
 
Args:
    csv_path: Path to a CSV file.
Returns:
    List of rows mapping column name -> cell value (all strings).
main()
The main function to start the default scheduler services.
 
Requirements are not optional.
parse_args(argv: List[str]) -> PyAsyncScheduler.Arguments
This function parses a command line with a single optional argument
(--vault or -v) followed by the vault name, and collects the remaining
strings as tasks configuration files.
 
Args:
    argv: List of command-line arguments (typically sys.argv[1:]).
 
Returns:
    Arguments: A named tuple with 'vault' (str or None)
                and 'tasks_files' (list of strings).
 
Raises:
    ValueError: If the vault flag is provided without a value.
warn(message, category=None, stacklevel=1, source=None)
Issue a warning, or maybe ignore it or raise an exception.

 
Data
        Any = typing.Any
Awaitable = typing.Awaitable
Callable = typing.Callable
Coroutine = typing.Coroutine
Deque = typing.Deque
Dict = typing.Dict
Json = ~Json
List = typing.List
Optional = typing.Optional
PasswordVault = ~PasswordVault
Sequence = typing.Sequence
Set = typing.Set
Tuple = typing.Tuple
Union = typing.Union
__author_email__ = 'mauricelambert434@gmail.com'
__copyright__ = '\nPyAsyncScheduler Copyright (C) 2025 Maurice L...ome to redistribute it\nunder certain conditions.\n'
__description__ = '\nAn asynchronous task scheduler, with cron synta...c configuration, and optional vault integration.\n'
__license__ = 'GPL-3.0 License'
__maintainer__ = 'Maurice Lambert'
__maintainer_email__ = 'mauricelambert434@gmail.com'
__url__ = 'https://github.com/mauricelambert/PyAsyncScheduler'
argv = [r'C:\Program Files\Python310\lib\pydoc.py', '-w', r'PyAsyncScheduler.py']
copyright = '\nPyAsyncScheduler Copyright (C) 2025 Maurice L...ome to redistribute it\nunder certain conditions.\n'
license = 'GPL-3.0 License'
vault_imported = False

 
Author
        Maurice Lambert