Skip to main content

list_spark_jobs(options?)

List your Spark job scripts with pagination.
jobs = await client.list_spark_jobs()
print(jobs.scripts, jobs.has_more)

# Paginate through all scripts
from oleander_sdk import ListSparkJobsOptions

offset = 0
all_scripts = []
while True:
    page = await client.list_spark_jobs(ListSparkJobsOptions(limit=50, offset=offset))
    all_scripts.extend(page.scripts)
    if not page.has_more:
        break
    offset += 50

Parameters

options.limit
int
default:"20"
Number of scripts to return per page. Must be greater than 0.
options.offset
int
default:"0"
Number of scripts to skip for pagination. Must be 0 or greater.

Return type: ListSparkJobsResult

FieldTypeDescription
scriptslist[str]Script names for the current page
has_moreboolWhether more scripts are available

submit_spark_job(options)

Submit a Spark job for execution. Returns a SparkJobRun with a run_id that you can use with get_run() to poll status.
from oleander_sdk import SparkJobSubmitOptions

result = await client.submit_spark_job(SparkJobSubmitOptions(
    namespace="my-namespace",
    name="daily-etl",
    script_name="etl_pipeline.py",
    args=["--date", "2025-01-15"],
    executor_numbers=4,
))

print(result.run_id)

# Poll until done
run = await client.get_run(result.run_id)

Parameters

options.namespace
str
required
Job namespace. Must be non-empty.
options.name
str
required
Job name. Must be non-empty.
options.script_name
str
required
Name of the script to execute. Must be non-empty.
options.args
list[str]
default:"[]"
Arguments to pass to the script.
options.driver_machine_type
SparkMachineType
default:"spark.1.b"
Machine type for the Spark driver.
options.executor_machine_type
SparkMachineType
default:"spark.1.b"
Machine type for Spark executors.
options.executor_numbers
int
default:"2"
Number of executors (1-20).
options.job_tags
list[str]
default:"[]"
Tags applied to the job.
options.run_tags
list[str]
default:"[]"
Tags applied to this run.

Return type: SparkJobRun

FieldTypeDescription
run_idstrThe ID of the submitted run

Machine types

The SparkMachineType enum covers compute-optimized (c), balanced (b), and memory-optimized (m) options:
TypevCPUsCategory
spark.1.c / spark.1.b / spark.1.m1Compute / Balanced / Memory
spark.2.c / spark.2.b / spark.2.m2Compute / Balanced / Memory
spark.4.c / spark.4.b / spark.4.m4Compute / Balanced / Memory
spark.8.c / spark.8.b / spark.8.m8Compute / Balanced / Memory
spark.16.c / spark.16.b / spark.16.m16Compute / Balanced / Memory
from oleander_sdk import SparkMachineType

result = await client.submit_spark_job(SparkJobSubmitOptions(
    namespace="my-namespace",
    name="heavy-etl",
    script_name="etl_pipeline.py",
    driver_machine_type=SparkMachineType.SPARK_4_B,
    executor_machine_type=SparkMachineType.SPARK_8_M,
    executor_numbers=10,
))

submit_spark_job_and_wait(options)

Submit a Spark job and poll until it reaches a terminal state (COMPLETE, FAIL, or ABORT). Raises TimeoutError if the timeout is exceeded.
from oleander_sdk import SubmitSparkJobAndWaitOptions

result = await client.submit_spark_job_and_wait(SubmitSparkJobAndWaitOptions(
    namespace="my-namespace",
    name="daily-etl",
    script_name="etl_pipeline.py",
    poll_interval_ms=5000,
    timeout_ms=300000,
))

if result.state == "COMPLETE":
    elapsed = result.run.duration  # seconds
    # proceed with downstream work ...
else:
    raise Exception(f"Run {result.run_id} ended with state: {result.state}")
Accepts all submit_spark_job parameters plus:
options.poll_interval_ms
int
default:"10000"
Milliseconds between status polls. Must be greater than 0.
options.timeout_ms
int
default:"600000"
Maximum time to wait in milliseconds before raising TimeoutError.

Return type: SubmitAndWaitResult

FieldTypeDescription
run_idstrThe ID of the submitted run
statestrTerminal state (COMPLETE, FAIL, or ABORT)
runRunResponseFull run details

get_run(run_id)

Get the current status of a run. Use this to poll a job submitted with submit_spark_job().
run = await client.get_run(run_id)

if run.state == "COMPLETE":
    duration = run.duration  # seconds
    job_name = run.job.name
    # handle completion ...
elif run.state == "FAIL":
    error = run.error
    # handle failure ...

Return type: RunResponse

FieldTypeDescription
idstrRun ID
stateOptional[str]Current state (COMPLETE, FAIL, ABORT, etc.)
started_atOptional[str]ISO timestamp when the run started
queued_atOptional[str]ISO timestamp when the run was queued
scheduled_atOptional[str]ISO timestamp when the run was scheduled
ended_atOptional[str]ISO timestamp when the run ended
durationOptional[float]Run duration in seconds
errorOptional[Any]Error details if the run failed
tagslist[RunTag]List of tags with key, value, and optional source
jobRunJobInfoJob info with id, name, namespace
pipelineRunPipelineInfoPipeline info with id, name, namespace