get_spark_cluster(name_or_options)
Look up Spark cluster details before submitting a job. Use oleander for the built-in managed cluster or the name of a registered external cluster.
cluster = await client.get_spark_cluster("emr-prod")
print(cluster.type, cluster.properties)
Return shape
| Field | Type | Description |
|---|
name | str | Cluster name |
type | SparkClusterType | oleander, emr-serverless, or glue |
properties | Any | Cluster-specific configuration |
list_spark_jobs(options?)
List your uploaded Spark job scripts with pagination.
jobs = await client.list_spark_jobs()
print(jobs.scripts, jobs.has_more)
from oleander_sdk import ListSparkJobsOptions
offset = 0
all_scripts = []
while True:
page = await client.list_spark_jobs(
ListSparkJobsOptions(limit=50, offset=offset)
)
all_scripts.extend(page.scripts)
if not page.has_more:
break
offset += 50
Parameters
Number of scripts to return per page. Must be greater than 0.
Number of scripts to skip for pagination. Must be 0 or greater.
Return type: ListSparkJobsResult
| Field | Type | Description |
|---|
scripts | list[str] | Script names for the current page |
has_more | bool | Whether more scripts are available |
submit_spark_job(options)
Submit a Spark job for execution. cluster defaults to oleander.
For oleander-managed Spark, entrypoint is the uploaded script name. For external clusters, entrypoint is cluster-specific, such as an S3 URI for EMR Serverless or a Glue job name for Glue.
The canonical field is entrypoint. script_name is still accepted as a compatibility alias.
from oleander_sdk import SparkJobSubmitOptions
result = await client.submit_spark_job(SparkJobSubmitOptions(
namespace="my-namespace",
name="daily-etl",
entrypoint="etl_pipeline.py",
args=["--date", "2026-03-11"],
executor_numbers=4,
))
print(result.run_id)
run = await client.get_run(result.run_id)
Common options
| Option | Type | Default | Description |
|---|
cluster | str | "oleander" | Managed cluster or registered cluster name |
namespace | str | | Job namespace |
name | str | | Job name |
entrypoint | str | | Script name, S3 URI, or Glue job name depending on cluster |
args | list[str] | [] | Entrypoint arguments |
spark_conf | list[str] | [] | Spark configuration values |
packages | list[str] | [] | Extra package coordinates |
job_tags | list[str] | [] | Tags applied to the job |
run_tags | list[str] | [] | Tags applied to this run |
Oleander-managed options
| Option | Type | Default | Description |
|---|
driver_machine_type | SparkMachineType | spark.1.b | Driver machine type |
executor_machine_type | SparkMachineType | spark.1.b | Executor machine type |
executor_numbers | int | 2 | Number of executors, from 1 to 20 |
EMR Serverless options
| Option | Type | Description |
|---|
py_files | str | Zip archive of Python dependencies |
main_class | str | Main class for JVM jobs |
execution_iam_policy | str | IAM policy applied to execution |
Glue options
| Option | Type | Default | Description |
|---|
worker_type | GlueWorkerType | | Glue worker type |
number_of_workers | int | 1 | Number of Glue workers |
enable_auto_scaling | Optional[bool] | | Enable Glue auto scaling |
timeout_minutes | Optional[int] | | Timeout in minutes |
execution_class | Optional[str] | | Glue execution class such as STANDARD or FLEX |
execution_iam_policy | Optional[str] | | IAM policy applied to execution |
For Glue jobs, args are converted into key-value pairs. Pass them as alternating entries such as ["--source", "s3://bucket/input", "--target", "s3://bucket/output"].
External cluster example
result = await client.submit_spark_job(SparkJobSubmitOptions(
cluster="emr-prod",
namespace="finance",
name="daily-etl",
entrypoint="s3://my-bucket/jobs/etl_pipeline.py",
args=["--date", "2026-03-11"],
py_files="s3://my-bucket/jobs/deps.zip",
packages=["org.example:my-lib:1.0.0"],
))
Return type: SparkJobRun
| Field | Type | Description |
|---|
run_id | str | The ID of the submitted run |
Machine types
The SparkMachineType enum covers compute-optimized (c), balanced (b), and memory-optimized (m) options:
| Type | vCPUs | Category |
|---|
spark.1.c / spark.1.b / spark.1.m | 1 | Compute / Balanced / Memory |
spark.2.c / spark.2.b / spark.2.m | 2 | Compute / Balanced / Memory |
spark.4.c / spark.4.b / spark.4.m | 4 | Compute / Balanced / Memory |
spark.8.c / spark.8.b / spark.8.m | 8 | Compute / Balanced / Memory |
spark.16.c / spark.16.b / spark.16.m | 16 | Compute / Balanced / Memory |
submit_spark_job_and_wait(options)
Submit a Spark job and poll until it reaches a terminal state (COMPLETE, FAIL, or ABORT). Raises TimeoutError if the timeout is exceeded.
from oleander_sdk import SubmitSparkJobAndWaitOptions
result = await client.submit_spark_job_and_wait(SubmitSparkJobAndWaitOptions(
namespace="my-namespace",
name="daily-etl",
entrypoint="etl_pipeline.py",
poll_interval_ms=5000,
timeout_ms=300000,
))
if result.state == "COMPLETE":
elapsed = result.run.duration
# proceed with downstream work ...
else:
raise Exception(f"Run {result.run_id} ended with state: {result.state}")
Accepts all submit_spark_job options plus:
Milliseconds between status polls. Must be greater than 0.
Maximum time to wait in milliseconds before raising TimeoutError.
Return type: SubmitAndWaitResult
| Field | Type | Description |
|---|
run_id | str | The ID of the submitted run |
state | str | Terminal state (COMPLETE, FAIL, or ABORT) |
run | RunResponse | Full run details |
get_run(run_id)
Get the current status of a run. Use this to poll a job submitted with submit_spark_job().
run = await client.get_run(run_id)
if run.state == "COMPLETE":
duration = run.duration
job_name = run.job.name
# handle completion ...
elif run.state == "FAIL":
error = run.error
# handle failure ...
Return type: RunResponse
| Field | Type | Description |
|---|
id | str | Run ID |
state | Optional[str] | Current state |
started_at | Optional[str] | ISO timestamp when the run started |
queued_at | Optional[str] | ISO timestamp when the run was queued |
scheduled_at | Optional[str] | ISO timestamp when the run was scheduled |
ended_at | Optional[str] | ISO timestamp when the run ended |
duration | Optional[float] | Run duration in seconds |
error | Optional[Any] | Error details if the run failed |
tags | list[RunTag] | List of tags with key, value, and optional source |
job | RunJobInfo | Job info with id, name, namespace |
pipeline | RunPipelineInfo | Pipeline info with id, name, namespace |