pulpo-messaging
pulpo-messaging
is a lightweight framework for handling distributed delayed job processing. This can be useful for any background jobs that you need within your system.
Consider the following scenarios:
- After a customer purchases an item from your site, your system needs to process the order as a background job
- When a patient has a pending medication order, your system needs to perform medication/allergy checks
- As new metric data becomes available, your system needs to periodically create reports derived from raw data
Here are a list of general use cases:
- Offloading Heavy Tasks from Web Requests: In web applications, long-running tasks like sending emails, generating reports, or image processing can be offloaded to Beanstalkd. This helps in responding to user requests quickly, improving user experience by handling time-consuming tasks asynchronously.
- Task Scheduling and Delayed Execution: Beanstalkd can be used for tasks that need to be executed after a certain delay. For example, sending reminder emails, scheduled notifications, or delayed updates can be queued and executed at the specified time.
- Workload Distribution: Distributing tasks among multiple worker processes or servers is a common use case. Beanstalkd can help in load balancing by evenly distributing jobs across various workers, enhancing the efficiency and scalability of the system.
- Handling Spikes in Traffic: During periods of high traffic, Beanstalkd can be used to manage and queue incoming jobs, ensuring that the system remains responsive and stable by controlling the rate at which jobs are processed.
- Job Prioritization: In systems where certain tasks have higher priority, Beanstalkd’s priority queuing feature allows for critical jobs to be processed first, ensuring important tasks are completed in a timely manner.
- Decoupling System Components: Beanstalkd can act as a buffer and a communication medium between different parts of a system, helping in decoupling components. This separation of concerns makes systems easier to manage and scale.
- Batch Processing: Accumulating data and processing it in batches (like aggregating logs, analytics data, or processing bulk email sends) can be effectively managed using Beanstalkd.
- Microservices Communication: In a microservices architecture, Beanstalkd can be used for inter-service communication, where services can asynchronously push and pull messages or tasks to and from the queue.
- Reliability and Fault Tolerance: For applications requiring high reliability, Beanstalkd’s job reservation and time-to-run features ensure that jobs are not lost and are retried in case of worker failures.
- Background Data Processing: Performing data-intensive operations like data mining, file parsing, or ETL (Extract, Transform, Load) tasks in the background.
Features
Each of these uses follows a workflow of:
- request job (publish)
- delegate job to a software component that can handle the request based upon job characteristics.
pulpo-messaging
manages this workflow:
- Defines a flexible
message
model which includes:headers
for routing / control,body
for processing - Provides a
publish
API to request a job, which in turn stores within an internal queue. Publishing a message support a variety of featurees including specifying the request type, delaying a message, or specifying expiration. - Defines a
queue_adapter
interface, to allow for a variety of queue implementation - Provides a lightweight, file based
queue_adapter
implementation for low volume use cases - Provider a robust
queue_adapter
implementation based onbeanstalkd
- Defines a
handler
interface, to be implemented by integrator (you!) specific to your system logic - Handles result logic, including deleting the request on success and resubmitted the request on a transient exception
Roles
- Architect can map job type to handlers
- Developers can implement new handlers
- Producers can request jobs to be processed
- Producers can specify expiration of job requests
- Producers can specify delay (earliest date/time to execute job) of job requests
pulpo-messaging
processes requests based on priority order, and routes job requests to handlerpulpo-messaging
automatically retries on transient exceptions, and buries jobs on fatal exceptions
Concepts
job
: single delayed processing taskjob request
: request topulpo-messaging
to perform a single jobproducer
: system requesting jobhandler
: the set of Python code to fulfil the job request. This is where the extensibility of the system comes into play: you create your own handlers with your logic andpulpo-messaging
will delegate to your registered handlers.queue
:pulpo-messaging
uses a queue for holding the job requests which need to be processed.queue_adapter
: the queue adapter integratespulpo-messaging
with a specific queue implementation. Currently, there arequeue_adapters
for a light-weight, file based implementation (file_queue_adapter
) and a robust implementation utilizingbeanstalkd
(beanstalk_queue_adapter
)
API
Message
A message is the implementation of a job request
. Message is implemented a dictionary, with root elements of id
, header
, and body
.
Field | Parent | Specified by | Description |
---|---|---|---|
id | root | queue_adapter on enqueue | unique identifier |
header | root | various | stores content used for routing / flow control |
request_type | header | producer | defines the job that is being requests (i.e. send_email, print_shipping_label, etc) |
expiration | header | producer | Specifies the latest that a job may be processed. This is provided as an absolute date/time. |
delay | header | producer | Specifies the earlier that a job may be processed. Prior to this date/time, the message will not be dequeue. This is provided as an absolute date/time. |
priority | header | producer | Specifies the order by which jobs will be processed. 0 is the highest priority, the lowest priority being approx 4M. Negative numbers are treated as 0 |
attempts | header | queue_adapter | Tracks the number of (failed) attempts on a given message. This is likely only used by the file_queue_adapter. |
body | root | producer | defines the content the the handler will need to execute the job. This is stored as key-value pairs. For example, for a job that sends an email, the message could have a body with key-value pairs of “to”, “subject”, “body”. |
payload | body | producer | for simplistic jobs, payload acts as a single value for job request. |
Pulpo
Config
shutdown_after_number_of_empty_iterations
(int): pulpo looks for new jobs to process by iterating, checking the queue_adapter for new jobs. If there are multiple iterations with no messages (as specified by this setting), pulpo will shutdown (with the expectation that it would be automatically restarted).sleep_duration
(int): specifies the number of seconds to pause for each iteration when there are no messages available.queue_adapter_type(self)
(str): specifies the implementation of the queue_adapter. SpecifyFileQueueAdapter
to use the file based queue, orBeanstalkdQueueAdapter
to use the beanstalkd based queue.enable_output_buffering(self)
enable_banner
banner_name
banner_font
Publish
publish(message: Message) -> Message
=> enqueues the job request on to the queuemessage.request_type
: specifies the job that is being requested. This value is looked up against the configuration of the registry to determine the handlerpriority
(optional): specifies the order by which jobs will be processed.expiration
(optional): specifies the latest that a job may be processed.delay
(optional): specifies the earlier that a job may be processed.- publish returns a
Message
, which is the requested message with a populated message id
initialize
Beanstalk_Queue_Adapter
Config
- host
- port
- socket_timeout
- default_tube
- encoding
- reserve_timeout