# -*- coding: utf-8 -*-
from sjsclient import base
from sjsclient import utils
[docs]class JobStatus(object):
"""A Helper class that contains the job status"""
RUNNING = "RUNNING"
FINISHED = "FINISHED"
ERROR = "ERROR"
[docs]class Job(base.Resource):
"""A Spark job."""
#: Job ID
jobId = None
#: Context name
context = None
#: Jobs status
status = None
#: Time taken by the job to finish
duration = None
#: Main java class path
classpath = None
#: Response from Spark.
result = None
def __repr__(self):
return "<Job: %s>" % self.jobId
[docs] def delete(self):
"""Delete job."""
return self.manager.delete(self.jobId)
[docs] def get_config(self):
"""Get job configuration."""
return self.manager.get_config(self.jobId)
[docs]class JobConfig(dict):
"""A Spark job config dictionary."""
pass
[docs]class JobManager(base.ResourceManager):
"""Manage :class:`Job` resources."""
base_path = "jobs"
resource_class = Job
def _create_resource(self, data):
job = self.resource_class(self, data)
return job
[docs] def create(self, app, class_path, conf=None, ctx=None, sync=False):
"""Create a Spark job.
:param app: Instance of :class:`App`
:param class_path: Main class path of spark job.
:param conf: Configuration json
:param ctx: Instance of :class:`Context`
:param sync: Set to `True` for synchronous job creation
:rtype: :class:`Job`
"""
url = self.base_path
params = {'appName': app.name,
'classPath': class_path}
if ctx:
params['context'] = ctx.name
if sync:
params['sync'] = 'true'
resp = self.client._post(url, data=conf, params=params).json()
return self._create_resource(resp)
[docs] def get(self, job_id):
"""Get a specific Job. This returns more information than create.
:param job_id: The jobId of the :class:`Job` to get.
:rtype: :class:`Job`
"""
url = utils.urljoin(self.base_path, job_id)
resp = self.client._get(url).json()
return self._create_resource(resp)
[docs] def delete(self, job_id):
"""Delete a specific Job.
:param job_id: The jobId of the :class:`Job` to get.
"""
url = self.base_path
url = utils.urljoin(url, job_id)
resp = self.client._delete(url)
return resp
[docs] def get_config(self, job_id):
"""Get job configuration.
:param job_id: The jobId of the :class:`Job` to get.
:rtype: :class:`JobConfig`
"""
url = utils.urljoin(self.base_path, job_id, "config")
resp = self.client._get(url).json()
return JobConfig(resp)