The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

TheSchwartz::JobScheduler - Lightweight TheSchwartz job dispatcher with maintained database connections

VERSION

version 0.002

SYNOPSIS

    use TheSchwartz::JobScheduler;
    my @databases = (
        { id => 'db_1', prefix => 'theschwartz_schema.', },
        { id => 'db_2', prefix => 'theschwartz_schema.', },
    );
    use Database::ManagedHandle;
    sub get_dbh {
        my ($db_id) = @_;
        my $mh1 = Database::ManagedHandle->instance;
        return $mh1->dbh( $db_id );
    }

    my $client = TheSchwartz::JobScheduler->new(
        databases => \@databases,
        dbh_callback => \&get_dbh,
        );
    my $job_id = $client->insert(
        job => TheSchwartz::JobScheduler::Job->new(
                    funcname => 'fetch',
                    arg      => {type => 'site', url => 'https://example.com/'},
                    ),
    );

    my $job1 = TheSchwartz::JobScheduler::Job->new;
    $job1->funcname("WorkerName");
    $job1->arg({ foo => "bar" });
    $job1->uniqkey("uniqkey");
    $job1->run_after( time + 60 );
    $client->insert( job => $job1 );
    my $job2 = TheSchwartz::JobScheduler::Job->new(
        funcname => 'WorkerName',
        arg => { foo => 'baz' },
        );
    $client->insert( job => $job2 );

    my @jobs = $client->list_jobs( search_params => { funcname => 'funcname' }, );
    for my $job (@jobs) {
        print $job->jobid;
    }

DESCRIPTION

TheSchwartz::JobScheduler is an interface to insert a new job into TheSchwartz job queue (maintained by a database).

The rationale behind this module is using it in a long running web service, for instance, in Dancer2. Because the database connections cannot be relied to stay open indefinitely, we get a new database handle for each operation.

This module is solely created for the purpose of injecting a new job from web servers without loading additional TheSchwartz and Data::ObjectDriver modules onto your system. Your TheSchwartz job worker processes will still need to be implemented using the full featured TheSchwartz::Worker module.

Configuration: Databases and Their Handles

TheSchwartz can use several different databases simultaneously, for instance, to share load and distribute jobs safely to only those workers who could, in turn, demand restricted access. This makes TheSchwartz very decentralized.

If your setup is reasonably simple, for instance, a webapp, e.g. Dancer2, and TheSchwartz as a worker system executing long running tasks which would disrupt the webapp, then perhaps you only use one database. In that case, you can consider using the same database handle in both webapp and TheSchwartz. If you use database transactions to ensure an atomized commit, you can involve TheSchwartz::JobScheduler in the same transaction. If your transaction fails after worker task is inserted, then also the worker task gets cancelled (rollbacked).

If, however, your TheSchwartz system is complex or otherwise separate from the systems which create the tasks, or you simply use more than one database in TheSchwartz, you cannot share your other database handles with TheSchwartz::JobScheduler. Scheduler might need to access all databases in sequence to place the task in the right one. Besides this, TheSchwartz::JobScheduler is prepared for the possibility of one or more databases being off-line. It loops through all the databases until it gets a working database handle.

Database handles are provided by the calling program. This allows the caller to use any available system to provide the handles. If TheSchwartz::JobScheduler receives an undef instead of a database handle, it tries the next database. If there is no working database handles, it croaks.

Database configuration does not need database addresses, dns:s or usernames and passwords. Because TheSchwartz::JobScheduler gets the database handle from outside, it only needs to know a database id to separate between databases and a possible prefix for each database. Prefix is prepended to every database table and sequence name. If your database uses a different schema than the default one for TheSchwartz tables, use prefix to solve this.

    my %dbs = (
        db_1 => [ 'dbi:SQLite:...', undef, undef, {} ],
        db_2 => [ 'dbi:SQLite:...', undef, undef, {} ],
    );
    sub get_dbh {
        my ($id) = @_;
        my @connection_info = @{ $dbs->{ $id } };
        return DBI->connect( @connection_info );
    };
    my %databases = (
        db_1 => { prefix => 'theschwartz_schema.', dbh_callback => \&get_dbh, },
        db_2 => { prefix => 'another_schema.', dbh_callback => \&get_dbh, },
    );
    use TheSchwartz::JobScheduler;
    my $scheduler = TheSchwartz::JobScheduler->new(
        databases => \%databases,
        dbh_callback => \&get_dbh,
        );

In the following example the calling program is using Database::ManagedHandle, a module which makes certain that a database handle is always usable.

    # First create a Database::ManagedHandle config class
    # See Database::ManagedHandle for instructions
    # Then just use it:
    my %databases = (
        db_1 => {
            prefix => 'theschwartz_schema.',
        },
        db_2 => {
        {
            prefix => 'another_schema.',
        },
    );
    use TheSchwartz::JobScheduler;
    my $scheduler = TheSchwartz::JobScheduler->new(
        databases => \%databases,
        dbh_callback => 'Database::ManagedHandle->instance',
    );

DBH Callback

The item dbh_callback can be either a CODE reference, i.e. a subroutine, or a string which when executed with eval will produce an object. This object must have at least one method: dbh(). This method, when called, must return either a DBI::db object (such as created by DBI->connect, or an undef.

    use TheSchwartz::JobScheduler;
    my $scheduler = TheSchwartz::JobScheduler->new(
        databases => \%databases,
        dbh_callback => 'Database::ManagedHandle->instance',
    );

You can specify dbh_callback either when creating the client object or when calling insert() or list_jobs(). If you are using TheSchwartz::JobScheduler as part of another system, for example, a web service, you will probably want to share one opened database handle because that will allow you to include TheSchwartz::JobScheduler into a transaction.

    my %databases = (
        db_1 => { prefix => 'theschwartz_schema.', },
        db_2 => { prefix => 'another_schema.', },
    );
    sub get_dbh {
        my ($id) = @_;
        my @connection_info = @{ $databases{ $id } };
        return DBI->connect( @connection_info );
    };
    use TheSchwartz::JobScheduler;
    my $scheduler = TheSchwartz::JobScheduler->new(
        databases => \%databases,
    );
    my $job = TheSchwartz::JobScheduler::Job->new(
        funcname => 'my_func',
        );
    $scheduler->insert(
        job => $job,
        dbh_callback => $get_dbh,
        );

Uniqkey

The uniqkey field is an arbitrary string identifier used to prevent applications from posting duplicate jobs. At most one with the same uniqkey value can be posted to a single TheSchwartz database.

There are, however, valid situations when inserting the same job and uniqkey would make sense. For instance, in a case when several different actions one after another but independent of each other would result in the same job being required to run.

Note, the job arguments do not enter into the uniqueness consideration, only job name and unique key (funcid and uniqkey fields).

Depending on the database and whether uniqueness is protected with database constraints, such as primary keys, trying to insert another job with the same uniqkey can cause an error, the previous row being rewritten with new content and new arguments, or another row being created.

User can choose how to deal with this situation. When instantiating TheSchwartz::JobScheduler, user can define the additional option handle_uniqkey with any of the following values:

no_check

This option does not do any checking on the condition. If the database is configured to not allow an insert operation, it will throw an exception. User must be prepared for this, for instance, by enclosing the operation in eval.

This is the default setting.

overwrite

Update the fields arg, insert_time, run_after, grabbed_until, priority and coalesce, and return the existing entry's jobid. This setting will create a slight overhead.

Not yet implemented.

acknowledge

If there is already a matching entry (funcid and uniqkey fields), no change will be made. The jobid of the existing entry will be returned. This setting will create a slight overhead.

N.B. This option is used only when TheSchwartz::JobScheduler::Job has set the field uniqkey. If you don't use uniqkey, this problem will never arise.

N.B.2. Using either c<overwrite or acknowledge is the recommended value. Only in situations which require extreme throughput, should you consider other alternatives for this problem.>

    # Depending on the database table settings,
    # this will either throw an exception or
    # it will pass and result with invalid table data.
    my $scheduler = TheSchwartz::JobScheduler->new(
        databases => \%databases,
        dbh_callback => 'Database::ManagedHandle->instance',
        opts => {
            handle_uniqkey => 'no_check',
        },
    );
    my $job = TheSchwartz::JobScheduler::Job->new(
        funcname => 'Test::uniqkey',
        arg      => { an_item => 'value A' },
        uniqkey  => 'UNIQUE_STR_A',
        );
    $scheduler->insert( $job );
    $job = TheSchwartz::JobScheduler::Job->new(
        funcname => 'Test::uniqkey',
        arg      => { an_item => 'value B' },
        uniqkey  => 'UNIQUE_STR_A',
        );
    $scheduler->insert( $job );

Logging

TheSchwartz::JobScheduler uses the excellent Log::Any to produce logging messages.

The easiest way to get the logging messages printed is to add the following line in the preamble of your program:

    use Log::Any::Adapter ('Stdout', log_level => 'debug' );

Alternative, you can do this on the command line:

    perl '-MLog::Any::Adapter(Stdout, log_level=>trace)'

databases

The databases used by TheSchwartz.

Please see above "Configuration: Databases and Their Handles".

dbh_callback

Callback for TheSchwartz::JobScheduler to get a database handle.

Please see above "Configuration: Databases and Their Handles".

opts

Additional options for controlling other features, including uniqkey.

Please see above Uniqkey.

Example:

    my $scheduler = TheSchwartz::JobScheduler->new(
        databases => \@databases,
        dbh_callback => 'Database::ManagedHandle->instance',
        opts => {
            handle_uniqkey => 'no_check',
        },
    );

insert

Return a list of active jobs collected from all accessible databases. Create a job.

Parameters: job (TheSchwartz::JobScheduler::Job)

    my @jobs = $client->insert(
        job => TheSchwartz::JobScheduler::Job->new(
                    funcname => 'fetch',
                    arg      => {type => 'site', url => 'https://example.com/'},
                    ),
    );

funcname_to_id

Fetch function id from database. If not exists, then insert.

list_jobs

Return a list of active jobs collected from all accessible databases.

Parameters: A hash containing named parameters.

    my @jobs = $client->list_jobs(
        search_params => { funcname => 'fetch_webpage'},
    );

THANKS

This module is very much inspired by TheSchwartz::Simple.

SEE ALSO

TheSchwartz
TheSchwartz::Simple

AUTHOR

Mikko Koivunalho <mikkoi@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2023 by Mikko Koivunalho.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.