Skip to main content

get_spark_cluster(name_or_options)

Look up Spark cluster details before submitting a job. Use oleander for the built-in managed cluster or the name of a registered external cluster.
cluster = await client.get_spark_cluster("emr-prod")
print(cluster.type, cluster.properties)

Return shape

FieldTypeDescription
namestrCluster name
typeSparkClusterTypeoleander, emr-serverless, or glue
propertiesAnyCluster-specific configuration

list_spark_jobs(options?)

List your uploaded Spark job scripts with pagination.
jobs = await client.list_spark_jobs()
print(jobs.scripts, jobs.has_more)

from oleander_sdk import ListSparkJobsOptions

offset = 0
all_scripts = []
while True:
    page = await client.list_spark_jobs(
        ListSparkJobsOptions(limit=50, offset=offset)
    )
    all_scripts.extend(page.scripts)
    if not page.has_more:
        break
    offset += 50

Parameters

options.limit
int
default:"20"
Number of scripts to return per page. Must be greater than 0.
options.offset
int
default:"0"
Number of scripts to skip for pagination. Must be 0 or greater.

Return type: ListSparkJobsResult

FieldTypeDescription
scriptslist[str]Script names for the current page
has_moreboolWhether more scripts are available

submit_spark_job(options)

Submit a Spark job for execution. cluster defaults to oleander. For oleander-managed Spark, entrypoint is the uploaded script name. For external clusters, entrypoint is cluster-specific, such as an S3 URI for EMR Serverless or a Glue job name for Glue. The canonical field is entrypoint. script_name is still accepted as a compatibility alias.
from oleander_sdk import SparkJobSubmitOptions

result = await client.submit_spark_job(SparkJobSubmitOptions(
    namespace="my-namespace",
    name="daily-etl",
    entrypoint="etl_pipeline.py",
    args=["--date", "2026-03-11"],
    executor_numbers=4,
))

print(result.run_id)
run = await client.get_run(result.run_id)

Common options

OptionTypeDefaultDescription
clusterstr"oleander"Managed cluster or registered cluster name
namespacestrJob namespace
namestrJob name
entrypointstrScript name, S3 URI, or Glue job name depending on cluster
argslist[str][]Entrypoint arguments
spark_conflist[str][]Spark configuration values
packageslist[str][]Extra package coordinates
job_tagslist[str][]Tags applied to the job
run_tagslist[str][]Tags applied to this run

Oleander-managed options

OptionTypeDefaultDescription
driver_machine_typeSparkMachineTypespark.1.bDriver machine type
executor_machine_typeSparkMachineTypespark.1.bExecutor machine type
executor_numbersint2Number of executors, from 1 to 20

EMR Serverless options

OptionTypeDescription
py_filesstrZip archive of Python dependencies
main_classstrMain class for JVM jobs
execution_iam_policystrIAM policy applied to execution

Glue options

OptionTypeDefaultDescription
worker_typeGlueWorkerTypeGlue worker type
number_of_workersint1Number of Glue workers
enable_auto_scalingOptional[bool]Enable Glue auto scaling
timeout_minutesOptional[int]Timeout in minutes
execution_classOptional[str]Glue execution class such as STANDARD or FLEX
execution_iam_policyOptional[str]IAM policy applied to execution
For Glue jobs, args are converted into key-value pairs. Pass them as alternating entries such as ["--source", "s3://bucket/input", "--target", "s3://bucket/output"].

External cluster example

result = await client.submit_spark_job(SparkJobSubmitOptions(
    cluster="emr-prod",
    namespace="finance",
    name="daily-etl",
    entrypoint="s3://my-bucket/jobs/etl_pipeline.py",
    args=["--date", "2026-03-11"],
    py_files="s3://my-bucket/jobs/deps.zip",
    packages=["org.example:my-lib:1.0.0"],
))

Return type: SparkJobRun

FieldTypeDescription
run_idstrThe ID of the submitted run

Machine types

The SparkMachineType enum covers compute-optimized (c), balanced (b), and memory-optimized (m) options:
TypevCPUsCategory
spark.1.c / spark.1.b / spark.1.m1Compute / Balanced / Memory
spark.2.c / spark.2.b / spark.2.m2Compute / Balanced / Memory
spark.4.c / spark.4.b / spark.4.m4Compute / Balanced / Memory
spark.8.c / spark.8.b / spark.8.m8Compute / Balanced / Memory
spark.16.c / spark.16.b / spark.16.m16Compute / Balanced / Memory

submit_spark_job_and_wait(options)

Submit a Spark job and poll until it reaches a terminal state (COMPLETE, FAIL, or ABORT). Raises TimeoutError if the timeout is exceeded.
from oleander_sdk import SubmitSparkJobAndWaitOptions

result = await client.submit_spark_job_and_wait(SubmitSparkJobAndWaitOptions(
    namespace="my-namespace",
    name="daily-etl",
    entrypoint="etl_pipeline.py",
    poll_interval_ms=5000,
    timeout_ms=300000,
))

if result.state == "COMPLETE":
    elapsed = result.run.duration
    # proceed with downstream work ...
else:
    raise Exception(f"Run {result.run_id} ended with state: {result.state}")
Accepts all submit_spark_job options plus:
options.poll_interval_ms
int
default:"10000"
Milliseconds between status polls. Must be greater than 0.
options.timeout_ms
int
default:"600000"
Maximum time to wait in milliseconds before raising TimeoutError.

Return type: SubmitAndWaitResult

FieldTypeDescription
run_idstrThe ID of the submitted run
statestrTerminal state (COMPLETE, FAIL, or ABORT)
runRunResponseFull run details

get_run(run_id)

Get the current status of a run. Use this to poll a job submitted with submit_spark_job().
run = await client.get_run(run_id)

if run.state == "COMPLETE":
    duration = run.duration
    job_name = run.job.name
    # handle completion ...
elif run.state == "FAIL":
    error = run.error
    # handle failure ...

Return type: RunResponse

FieldTypeDescription
idstrRun ID
stateOptional[str]Current state
started_atOptional[str]ISO timestamp when the run started
queued_atOptional[str]ISO timestamp when the run was queued
scheduled_atOptional[str]ISO timestamp when the run was scheduled
ended_atOptional[str]ISO timestamp when the run ended
durationOptional[float]Run duration in seconds
errorOptional[Any]Error details if the run failed
tagslist[RunTag]List of tags with key, value, and optional source
jobRunJobInfoJob info with id, name, namespace
pipelineRunPipelineInfoPipeline info with id, name, namespace