Processes¶
Pygeoapi-prefect can be used to run pygeoapi processes of two different kinds:
Running standard pygeoapi processes¶
Pre-existing pygeoapi processes can simply be run by the pygeoapi-prefect
pygeoapi process manager without any
modification.
When using this option it is not possible to take advantage of most prefect features. Processes are always run locally, and it is not possible to use prefect features other than its ephemeral API and the UI for monitoring execution of processes. Other prefect features such as deployments and result storage are not available.
Nevertheless, you can launch prefect server and enjoy monitoring solution for the execution of pygeoapi processes just by using the prefect UI.
Running prefect-powered processes¶
In order to take full advantage of pygeoapi-prefect you will need to use its custom process class as a base for your own processes.
Prefect-based processes are able to use all prefect features such as blocks, deployments, scheduling, etc.
As such, in order to use prefect-powered processes you will need to:
- Write the code that performs the computation as a prefect flow
- Define a pygeoapi processor class
- Enable the processor in pygeoapi configuration file
- Ensure you have prefect running
- Deploy your flow
Implement process as a prefect flow¶
Note
We recommend getting having the prefect docs at hand when defining new flows.
pygeoapi-prefect is able to use regular prefect flows. However, they must meet the following requirements:
-
The
@flow()
decorator must at least include thepersist_results=True
parameter; -
Flows must accept the following parameters, which are passed to it by the pygeoapi process manager:
-
job_id: str
- The pygeoapi job id, as generated by its process manager -
result_storage_block: str
- the extended name (i.e.<block-type-slug>/<block-name>
) of a prefect block that is to be used for storing whatever outputs are to be generated during execution -
process_description: ProcessDescription
- The details about the processor, as known by pygeoapi -
execution_request: ExecuteRequest
- Execution-related details about the current pygeoapi job to be used in the flow run
-
-
Flows must store whatever results they produce using a prefect storage block
-
Flows must return an instance of
pygeoapi.models.processes.JobStatusInfoInternal
A simple example:
from prefect import flow, get_run_logger
from prefect.blocks.core import Block
from prefect.filesystems import LocalFileSystem
from pygeoapi.models import processes as schemas
from pygeoapi.process import exceptions
@flow(persist_result=True)
def simple_flow(
job_id: str,
result_storage_block: str | None,
process_description: schemas.ProcessDescription,
execution_request: schemas.ExecuteRequest
) -> schemas.JobStatusInfoInternal:
"""This is a simple prefect flow.
It complies with pygeoapi-prefect's requirements, namely:
- The @flow decorator includes `persist_results=True`
- Accepts the required input parameters (job_id, result_storage_block,
process_description, execution_request)
- Stores outputs using prefect block
- Returns a status_info
"""
logger = get_run_logger()
logger.debug("Starting execution...")
# 1. retrieve inputs
# 1.1. some may be mandatory
try:
name = execution_request.inputs["name"].__root__
except KeyError:
raise exceptions.MissingJobParameterError("Cannot process without a name")
else:
# 1.2. others may be optional
msg = execution_request.inputs.get("message")
message = msg.__root__ if msg is not None else ""
# 2. determine where results will be stored
if result_storage_block is not None:
storage = Block.load(result_storage_block)
else:
storage = LocalFileSystem()
# 3. Get to work! - Perform the generation of outputs
result_value = f"Hello {name}! {message}".strip()
# 4. Store the generated outputs using a prefect block
result_path = f"{job_id}/output-result.txt"
storage.write_path(result_path, result_value.encode("utf-8"))
# 5. Return a status info object
return schemas.JobStatusInfoInternal(
jobID=job_id,
processID=process_description.id,
status=schemas.JobStatus.successful,
generated_outputs={
"result": schemas.OutputExecutionResultInternal(
location=f"{storage.basepath}/{result_path}",
media_type=(
process_description.outputs["result"].schema_.content_media_type
),
)
},
)
Define pygeoapi processor class¶
In order to be usable by pygeoapi-prefect, the prefect flow defined above needs to be wrapped up in a custom pygeoapi processor. This processor must:
-
Derive from
BasePrefectProcessor
- This custom base class provides prefect-related functionality -
Have the
process_flow
class variable. This is a reference to the prefect flow function which is used for doing the work. See the previous section on how to implement a suitable flow. -
Have the
process_description
class variable, which holds metadata useful for pygeoapi to be able to describe the process. This must be an instance ofProcessDescription
. It is a description of the process, including its inputs as outputs. The only major requirement here is that the description'sid
property needs to match the name of the process, as specified in the pygeoapi configuration file
A simple example, meant to work together with the prefect flow defined earlier:
Warning
Inside your custom processor modules be sure to use absolute imports. Otherwise the prefect deployment may not be able to find all of your code's dependencies.
from prefect import flow
from pygeoapi.models import processes as schemas
from pygeoapi_prefect.process.base import BasePrefectProcessor
@flow(persist_result=True)
def simple_flow(
job_id: str,
result_storage_block: str | None,
process_description: schemas.ProcessDescription,
execution_request: schemas.ExecuteRequest
) -> schemas.JobStatusInfoInternal:
... # omitted for brevity, see above for the full implementation
class SimpleFlowProcessor(BasePrefectProcessor):
process_flow = simple_flow
process_description = schemas.ProcessDescription(
id="simple-flow", # id MUST match key given in pygeoapi config
version="0.0.1",
title="Simple flow Processor",
jobControlOptions=[
schemas.ProcessJobControlOption.SYNC_EXECUTE,
schemas.ProcessJobControlOption.ASYNC_EXECUTE
],
inputs={
"name": schemas.ProcessInput(
title="Name",
schema=schemas.ProcessIOSchema(type=schemas.ProcessIOType.STRING)
),
"message": schemas.ProcessInput(
title="Message",
schema=schemas.ProcessIOSchema(type=schemas.ProcessIOType.STRING),
minOccurs=0
),
},
outputs={
"result": schemas.ProcessOutput(
schema=schemas.ProcessIOSchema(type=schemas.ProcessIOType.STRING, contentMediaType="text/plain")
)
},
)
Enable process in pygeoapi¶
In order to be usable, a process must be specified in pygeoapi configuration file. pygeoapi-prefect recognizes the following process-related configuration:
processor.name
- Dotted path to the pygeoapi processor class to be used
processor.prefect.result_storage
- Identifier of the prefect storage block that is to be used for storing generated outputs. Flow runs must always use a storage block to store execution outputs
processor.prefect.deployment.name
- Name of the prefect deployment that will be created/used
processor.prefect.deployment.queue
- Name of the prefect queue where flow runs will be scheduled to
processor.prefect.deployment.storage_block
- Identifier of the prefect storage block that is to be used for storing the flow deployment
processor.prefect.deployment.storage_sub_path
- Path for storing the flow deployment
A simple example:
# pygeoapi configuration file
resources:
# id of the process MUST be the same as the `id` property of the processor's
# `process_description`
hi-prefect-world:
type: process
processor:
name: pygeoapi_prefect.examples.hi_prefect_world.HiPrefectWorldProcessor
prefect:
result_storage: remote-file-system/test-sb1-results
deployment:
name: minio
queue: pygeoapi
storage_block: remote-file-system/test-sb1
storage_sub_path: hi-prefect-world-flow