Source code for cybergis_compute_client.Job

from .MarkdownTable import MarkdownTable  # noqa
import time
import json
from os import system, name
from IPython.display import display, clear_output, Markdown
import ipywidgets as widgets


[docs]class Job: """ Job class Attributes: client (obj): Client that this job requests information from maintainer (obj): Maintainer pool that this job is in isJupyter (bool): Whether or not this is running in Jupyter jupyterhubApiToken (str): API token needed to send requests using the JupyterHub API id (str): Id assigned to this job by the client hpc (str): HPC that this job will be submitted to """ # static variables basicEventTypes = [ 'JOB_QUEUED', 'JOB_REGISTERED', 'JOB_INIT', 'GLOBUS_TRANSFER_INIT_SUCCESS', 'JOB_ENDED', 'JOB_FAILED'] def __init__(self, maintainer=None, hpc=None, id=None, hpcUsername=None, hpcPassword=None, client=None, isJupyter=None, jupyterhubApiToken=None, printJob=True): # TODO: we can make this better if (jupyterhubApiToken is None): raise Exception('please login to jupyter first') self.client = client self.maintainer = maintainer self.isJupyter = isJupyter self.jupyterhubApiToken = jupyterhubApiToken job = None if (id is None): # create new job if maintainer is None: raise Exception('maintainer cannot by NoneType') req = {'maintainer': maintainer, 'jupyterhubApiToken': jupyterhubApiToken} if (hpc is not None): req['hpc'] = hpc if (hpcUsername is None): job = self.client.request('POST', '/job', req) else: req['user'] = hpcUsername req['password'] = hpcPassword job = self.client.request('POST', '/job', req) hpc = job['hpc'] id = job['id'] else: # reinstate existing job job = self.client.request('GET', '/job/' + id, {'jupyterhubApiToken': jupyterhubApiToken}) hpc = job['hpc'] if (hpcPassword is not None): print('⚠️ HPC password input detected, change your code to use .get_job_by_id() instead') print('🙅‍♂️ it\'s not safe to distribute code with login credentials') self.id = id self.hpc = hpc if printJob: self._print_job_formatted(job)
[docs] def submit(self): """ Submits this job to the client, and prints the output Returns: Job: This job """ body = {'jupyterhubApiToken': self.jupyterhubApiToken} job = self.client.request('POST', '/job/' + self.id + '/submit', body) print('✅ job submitted') self._print_job_formatted(job) return self
[docs] def set(self, localExecutableFolder=None, localDataFolder=None, localResultFolder=None, param=None, env=None, slurm=None, printJob=True): """ PUT requests information about this job to the client so it can be submitted to the hpc. Displays information about this job unless specified otherwise. Args: executableFolder (str): Path of the executable folder dataFolder (str): Path of the data folder resultFolder (str): Path of the result folder param (dict): Rules for input data env (dict): Enviorment variables required by the appliation slurm (dict): Slurm input rules printJob (bool): If the status of the job should be printed """ body = {'jupyterhubApiToken': self.jupyterhubApiToken} if localExecutableFolder: body['localExecutableFolder'] = localExecutableFolder if localDataFolder: body['localDataFolder'] = localDataFolder if localResultFolder: body['localResultFolder'] = localResultFolder if param: body['param'] = param if env: body['env'] = env if slurm: body['slurm'] = slurm if (len(list(body)) == 1): print('❌ please set at least one parmeter') job = self.client.request('PUT', '/job/' + self.id, body) if printJob: self._print_job(job)
[docs] def events( self, raw=False, basic=True, refreshRateInSeconds=10): """ While the job is running, display the events generated by the client Args: raw (bool): If true, return a list of the events generated by status liveOutput (bool): basic (bool): If true, exclude non-basicEventType events RefreshRateInSeconds (int): Number of seconds to wait before refreshing status Todo: Modify function to include liveOutput or remove it from the arguments """ if raw: return self.status(raw=True)['events'] isEnd = False jobFailure = False while (not isEnd): self._clear() status = self.status(raw=True) out = status['events'] headers = ['types', 'message', 'time'] events = [] for o in out: # if o['type'] not in self.basicEventTypes and basic: # continue events.append([ o['type'], o['message'], o['createdAt'] ]) isEnd = isEnd or o['type'] == 'JOB_ENDED' or o[ 'type'] == 'JOB_FAILED' if isEnd and o['type'] == 'JOB_FAILED': jobFailure = True print('📮 Job ID: ' + self.id) if 'slurmId' in status: print('🤖 Slurm ID: ' + str(status['slurmId'])) def markdown_widget(text): out = widgets.Output() with out: display(Markdown(text)) return out markdown = MarkdownTable.render(events, headers) markdown_table = markdown_widget(markdown) table_exp = widgets.Accordion(children=[markdown_table], selected_index=None) table_exp.set_title(0, "See events") if len(events) > 0: if self.isJupyter: display(table_exp) else: print(markdown) if not isEnd: time.sleep(refreshRateInSeconds) return jobFailure
[docs] def logs(self, raw=False, liveOutput=True, refreshRateInSeconds=15): """ While the job is running, display the logs generated by the client. Args: raw (bool): If true, return a list of the events generated by status liveOutput (bool): RefreshRateInSeconds (int): Number of seconds to wait before refreshing status Returns: list: List of logs generated by the client. Only returned if raw is true. Todo: Modify function to include liveOutput or remove it from the arguments """ if raw: return self.status(raw=True)['logs'] logs = [] isEnd = False while (not isEnd): self._clear() status = self.status(raw=True) headers = ['message', 'time'] logs = [] for o in status['events']: isEnd = isEnd or o['type'] == 'JOB_ENDED' or o[ 'type'] == 'JOB_FAILED' for o in status['logs']: i = [ o['message'], o['createdAt'] ] logs.append(i) print('📮 Job ID: ' + self.id) if 'slurmId' in status: print('🤖 Slurm ID: ' + str(status['slurmId'])) def markdown_widget(text): out = widgets.Output() with out: display(Markdown(text)) return out markdown = MarkdownTable.render(logs, headers) markdown_table = markdown_widget(markdown) table_exp = widgets.Accordion(children=[markdown_table], selected_index=None) table_exp.set_title(0, "See logs") if len(logs) > 0: if self.isJupyter: display(table_exp) else: print(markdown) if not isEnd: time.sleep(refreshRateInSeconds)
[docs] def status(self, raw=False): """ Displays the status of this job, and returns it if specified. Args: raw (bool): If information about this job should be returned Returns: dict: Infomation about this job returned by the client. This includes the job's 'id', 'hpc', 'executableFolder', 'dataFolder', 'resultFolder', 'param', 'slurm', 'userId', 'maintainer', 'createdAt', and 'events' Raises: Exception: If the 'id' attribute is None """ if self.id is None: raise Exception('missing job ID, submit/register job first') job = self.client.request('GET', '/job/' + self.id, { 'jupyterhubApiToken': self.jupyterhubApiToken }) if raw: return job self._print_job_formatted(job)
[docs] def result_folder_content(self): """ Returns the results from the job Returns: dict: Results from running the job Raises: Exception: If the id is None """ if self.id is None: raise Exception('missing job ID, submit/register job first') out = self.client.request('GET', '/job/' + self.id + '/result-folder-content', {'jupyterhubApiToken': self.jupyterhubApiToken}) return out
[docs] def download_result_folder_by_globus(self, localPath=None, localEndpoint=None, remotePath=None, raw=False): """ Downloads the folder with results from the job using Globus Args: remotePath (string): Path to the remote result folder raw (bool): If the function should return the output from the client Returns: dict: Output from the client when downloading the results using globus. Only returned when raw is true. Raises: Exception: If the job ID is None Exception: If the key 'resultFolder' is not returned with status Exception: If the result folder is formatted improperly """ if self.id is None: raise Exception('missing job ID, submit/register job first') jobStatus = self.status(raw=True) if 'remoteResultFolder' not in jobStatus: raise Exception('executable folder is not ready') folderId = jobStatus['remoteResultFolder']['id'] # init globus transfer self.client.request('POST', '/folder/' + folderId + '/download/globus-init', { "jobId": self.id, "jupyterhubApiToken": self.jupyterhubApiToken, "fromPath": remotePath, "toPath": localPath, "toEndpoint": localEndpoint }) status = None while status not in ['SUCCEEDED', 'FAILED']: self._clear() print('⏳ waiting for file to download using Globus') out = self.client.request('GET', '/folder/' + folderId + '/download/globus-status', { "jupyterhubApiToken": self.jupyterhubApiToken }) status = out['status'] if raw: return out # exit loop self._clear() if status == 'SUCCEEDED': print('✅ download success!') else: print('❌ download fail!')
# Helpers def _clear(self): """ Clears output """ if self.isJupyter: clear_output(wait=True) # for windows if name == 'nt': _ = system('cls') # for mac and linux(here, os.name is 'posix') else: _ = system('clear') def _print_job(self, job): """ Displays information about this job Args: job (dict): Information about this job returned by the client """ if job is None: return headers = [ 'id', 'slurmId', 'hpc', 'remoteExecutableFolder', 'remoteDataFolder', 'remoteResultFolder', 'param', 'slurm', 'userId', 'maintainer', 'createdAt'] data = [[ job['id'], job['slurmId'], job['hpc'], job['remoteExecutableFolder'], job['remoteDataFolder'], job['remoteResultFolder'], json.dumps(job['param']), json.dumps(job['slurm']), job['userId'], job['maintainer'], job['createdAt'], ]] if self.isJupyter: display(Markdown(MarkdownTable.render(data, headers))) else: print(MarkdownTable.render(data, headers)) def _print_job_formatted(self, job): """ Displays information about the job formatted in a way that can be read with no horizonal scroll bar """ if job is None: return if job['localExecutableFolder'] is None: modelName = "None" else: modelName = job['localExecutableFolder']['gitId'] headersCol1 = [ 'id', 'slurmId', 'hpc', 'remoteExecutableFolder', 'remoteDataFolder', 'remoteResultFolder'] headersCol2 = [ 'param', 'slurm', 'userId', 'maintainer', 'createdAt', 'modelName'] dataCol1 = [[ job['id'], job['slurmId'], job['hpc'], job['remoteExecutableFolder'], job['remoteDataFolder'], job['remoteResultFolder'], ]] dataCol2 = [[ json.dumps(job['param']), json.dumps(job['slurm']), job['userId'], job['maintainer'], job['createdAt'], modelName ]] if self.isJupyter: display(Markdown(MarkdownTable.render(dataCol1, headersCol1))) display(Markdown(MarkdownTable.render(dataCol2, headersCol2))) else: print(MarkdownTable.render(dataCol1, headersCol1)) print(MarkdownTable.render(dataCol2, headersCol2))