Код
Код расчетного узла
В расчетном узле размещаются исполнительные модули (программ, скрипты). Для взаимодействия с сервером платформы используются специальные библиотеки.
Python
Должна использоваться библиотека rndflow-job-py
Импорт библиотеки:
from rndflow import job
params() - получить параметры узла во время запуска задания
packages() - получить пакеты задания
files(*suffixes) - загрузить данные файлов пакетов
load(readers={}) - получить поля пакетов задания и параметров узла в время запуска задания
Если имена полей пакетов и имена параметров узла совпадают, то будет загружено значение поля пакета.
save_package(label=None, files={}, fields={}, images={}) - сохранить выходной пакет
label - метка пакета, fields - поля пакета, files - файлы пакета, images - объекты matplotlib или plotly.
Объекты matplotlib сохраняются как изображения, по умолчанию если не указано расширение файла, то в PNG формате, объекты plotly сохраняются в JSON формате.
Пример:
#!/usr/bin/env python
# Импорт библиотеки
from rndflow import job
from numpy import linspace, pi
# Загрузка переменных входного пакета в качестве глобальных переменных.
globals().update(job.load())
print(f'{size = }, {span = }')
x = linspace(0, span * pi, size)
# Сохранение выходного пакета. Переменная x будет сохранена как HDF5 файл.
job.save_package(
label='x',
fields=dict(
size=size,
span=span,
job_id=job.job_id
),
files=dict(
x=x
)
)
API доступ к проекту
Для программного доступа к проекту используются API ключи.
Python
Низкоуровневый пример
Импорт библиотеки:
from rndflow import job
from rndflow.server import Server
Использование:
#!/usr/bin/env python
from rndflow import job
from rndflow.server import Server
from datetime import datetime, timedelta
from time import sleep
# Load packages variables as global variables
globals().update(job.load())
server = Server(api_key=job.secret('server_token'))
# Get secrets
project = job.secret('server_project')
input_node = job.secret('server_input')
output_node = job.secret('server_output')
# Get last data layer from the server
layer = server.get(f'/projects/{project}/data_layers/last')
# Post new package to the start node
p = server.post(f'/projects/{project}/nodes/{input_node}/packages',
params=dict(
data_layer_id=layer['id'],
),
json=dict(
label='from_client',
fields=dict(size=size, span=span)))
print('Sent workload to the server')
#---------------------------------------------------------------------------
# Poll server to get the resulting package.
# This allows to exit on timeout.
#---------------------------------------------------------------------------
print('Waiting for the results...')
timeout = datetime.now() + timedelta(minutes=5)
ready = False
while not ready and datetime.now() < timeout:
page = server.post(f'/projects/{project}/nodes/{output_node}/packages/search',
params=dict(
data_layer_id=layer['id'],
page_size=1
),
json=dict(
master_id=p['id']
))
for result in page['items']:
ready = True
break
else:
sleep(5)
if not ready:
raise Exception('Timeout!')
print('Done')
files = server.get(f'/projects/{project}/nodes/{output_node}/packages/{result["id"]}/files')
def download_file(f):
return lambda path: server.download(f, path)
# Save output package
job.save_package(
label='result',
fields={f['name'] : f['value'] for f in result['fields']},
files={f['name']: download_file(f) for f in files}
)
Примеры использования обертки ServerProxy
Существует возможность использования вспомогательного класса-обертки ServerProxy.
ServerProxy предлагает к использованию следующие методы:
ServerProxy - инициациализация объекта класса обертки.
Параметры:
- api_key (str): API key
- project (int): Project-server ID
- input_node (int): Input node ID of project-server
- output_node (int): Output node ID of project-server
- api_server (str, optional): API server URL. Defaults to None.
get_server - получение объекта класса обертки с использованием значений секретов.
Параметры:
- prefix (str): API key secrets common prefix name
- api_server (str,optional): API server URL. Defaults to None.
get_last_datalayer - получить идентификатор последнего доступного слоя данных.
get_data_layers - получить список доступных объектов слоев данных.
create_package_and_post - передать данные в входной узел проекта и получить идентификатор созданного пакета.
Параметры:
- layer (int): data layerd ID
- label (str): package label
- fields (dict): package fields
post_package - передать данные в входной узел проекта и получить идентификатор созданного пакета.
Параметры:
- layer (int): data layerd ID
- package (dict): package
search_by_master - запросить результат обработки из выходного узла проекта, соответствующий переданным ранее в входной узел данным.
Параметры:
- layer (int): data layerd ID
- master (int): master package id
- page (int): page number, defaults to 1.
- page_size (int): packages count on page, defaults to 1.
Возвращаемый результат - словарь с полями
total
- общее количество пакетов,items
- список пакетов (количество зависит от общего количества пакетов и параметров page и page_size).wait_result - вернуть пакеты из выходного узла проекта, соответствующие переданным ранее в входной узел данным; возвращается список пакетов и их общее количество или выбрасывает Timeout исключение.
Параметры:
- layer (int): data layerd ID
- master (int): master package id
- timeout (timedelta, optional): Timeout. Defaults to timedelta(minutes=5).
- retry_pause (int, optional): Pause between requests to output node. Defaults to 5.
- page (int): page number, defaults to 1.
- page_size (int): packages count on page, defaults to 10.
wait_one_result - вернуть первый пакет из выходного узла проекта, соответствующий переданному ранее в входной узел данным; возвращает идентификатор пакета и его поля или выбрасывает Timeout исключение.
Параметры:
- layer (int): data layerd ID
- master (int): master package id
- timeout (timedelta, optional): Timeout. Defaults to timedelta(minutes=5).
- retry_pause (int, optional): Pause between requests to output node. Defaults to 5.
get_files_list - возвращает список файлов для пакета с указанным идентификатором.
Параметры:
- ident (int): package ID
wait_one_result_and_files - вернуть первый пакет из выходного узла проекта, соответствующий переданным ранее в входной узел данным; возвращает идентификатор пакета, его поля и файлы или выбрасывает Timeout исключение.
Параметры:
- layer (int): data layerd ID
- master (int): master package id
- timeout (timedelta, optional): Timeout. Defaults to timedelta(minutes=5).
- retry_pause (int, optional): Pause between requests to output node. Defaults to 5.
Импорт библиотеки:
from rndflow import job
from rndflow.server import ServerProxy
Пример: Отправка и получение результата с использованием search_by_master
:
#!/usr/bin/env python
from rndflow import job
from rndflow.server import ServerProxy
from datetime import datetime, timedelta
from time import sleep
globals().update(job.load())
api_key=job.secret('server_token')
project = job.secret('server_project')
input_node = job.secret('server_input')
output_node = job.secret('server_output')
server = ServerProxy(api_key, project, input_node, output_node)
# Get last data layer from the server
layer = server.get_last_datalayer()
# Post new package to the start node
json=dict(label='from_client', fields=dict(size=size, span=span))
p = server.post_package(layer, json)
print('Sent workload to the server')
#---------------------------------------------------------------------------
# Poll server to get the resulting package.
# This allows to exit on timeout.
#---------------------------------------------------------------------------
print('Waiting for the results...')
timeout = datetime.now() + timedelta(minutes=5)
ready = False
while not ready and datetime.now() < timeout:
page = server.search_by_master(layer, p)
for result in page['items']:
ready = True
break
else:
sleep(5)
if not ready:
raise Exception('Timeout!')
print('Done')
files = server.get_files_list(result['id'])
def download_file(f):
return lambda path: server.download(f, path)
job.save_package(
label='result',
fields={f['name'] : f['value'] for f in result['fields']},
files={f['name']: download_file(f) for f in files}
)
Пример: Отправка и получение результата с использованием wait_one_result
:
#!/usr/bin/env python
from rndflow import job
from rndflow.server import ServerProxy
from datetime import timedelta
globals().update(job.load())
api_key=job.secret('server_token')
project = job.secret('server_project')
input_node = job.secret('server_input')
output_node = job.secret('server_output')
server = ServerProxy(api_key, project, input_node, output_node)
# Get last data layer from the server
layer = server.get_last_datalayer()
# Post new package to the start node
json=dict(label='from_client', fields=dict(size=size, span=span))
p = server.post_package(layer, json)
print('Sent workload to the server')
#---------------------------------------------------------------------------
# Poll server to get the resulting package.
# This allows to exit on timeout.
#---------------------------------------------------------------------------
print('Waiting for the results...')
rid, rfields = server.wait_one_result(layer, p)
print('Done')
files = server.get_files_list(rid)
def download_file(f):
return lambda path: server.download(f, path)
job.save_package(
label='result',
fields={f['name'] : f['value'] for f in rfields},
files={f['name']: download_file(f) for f in files}
)
Пример: Отправка и получение результата с использованием get_server
и wait_one_result_and_files
:
#!/usr/bin/env python
from rndflow import job
from rndflow.server import ServerProxy
from datetime import datetime, timedelta
from time import sleep
globals().update(job.load())
# Create Server proxy object
server=ServerProxy.getServer('server')
# Get last data layer from the server
layer = server.get_last_datalayer()
# Post new package to the start node
json=dict(label='from_client', fields=dict(size=size, span=span))
p = server.post_package(layer, json)
print('Sent workload to the server')
#---------------------------------------------------------------------------
# Poll server to get the resulting package.
# This allows to exit on timeout.
#---------------------------------------------------------------------------
print('Waiting for the results...')
rid, rfields, rfiles = server.wait_one_result_and_files(layer, p, timedelta(minutes=2))
def download_file(f):
return lambda path: server.download(f, path)
job.save_package(
label='result',
fields={f['name'] : f['value'] for f in rfields},
files={f['name']: download_file(f) for f in rfiles}
)
Пример: Получение нескольких результирующих пакетов
#!/usr/bin/env python
from rndflow import job
from rndflow.server import ServerProxy
from datetime import timedelta
globals().update(job.load())
# Create Server proxy object
server=ServerProxy.get_server('server')
# Get last data layer from the server
layer = server.get_last_datalayer()
# Post new package to the start node
json=dict(label='from_client', fields=dict(size=size, span=span))
p = server.post_package(layer, json)
print('Sent workload to the server')
#---------------------------------------------------------------------------
# Poll server to get the resulting package.
# This allows to exit on timeout.
#---------------------------------------------------------------------------
print('Waiting for the results...')
count = 0
while (count < 2):
items, _ = server.wait_result(layer, p, timedelta(minutes=2), 5)
count = len(items)
def download_file(f):
return lambda path: server.download(f, path)
for item in items:
rfields = item['fields']
rfiles = server.get_files_list(item['id'])
job.save_package(
label='result_' + item['label'],
fields={f['name'] : f['value'] for f in rfields},
files={f['name']: download_file(f) for f in rfiles}
)
Пример: Получение всех пакетов из выходного узла.
#!/usr/bin/env python
from rndflow import job
from rndflow.server import ServerProxy
from datetime import timedelta
globals().update(job.load())
# Create Server proxy object
server=ServerProxy.get_server('server')
# Get last data layer from the server
layer = server.get_last_datalayer()
print('Get packages from output node...')
items, total = server.waitResult(layer, None, timedelta(minutes=2), 5)
print(f"Packages count: {total}")
print(f"Packages received: {len(items)}")
def download_file(f):
return lambda path: server.download(f, path)
for item in items:
rfields = item['fields']
rfiles = server.get_files_list(item['id'])
job.save_package(
label='result_' + item['label'],
fields={f['name'] : f['value'] for f in rfields},
files={f['name']: download_file(f) for f in rfiles}
)