Код

Код расчетного узла

В расчетном узле размещаются исполнительные модули (программ, скрипты). Для взаимодействия с сервером платформы используются специальные библиотеки.

Python

Должна использоваться библиотека rndflow-job-pyopen in new window

Импорт библиотеки:

from rndflow import job

Методыopen in new window

  • params() - получить параметры узла

  • packages() - получить пакеты узла

  • load(readers={}) - получить переменные пакетов узла

  • save_package(label=None, files={}, fields={}, images={}) - сохранить выходной пакет

    label - метка пакета, fields - поля пакета, files - файлы пакета, images - объекты matplotlibopen in new window или plotlyopen in new window.

    Объекты matplotlib сохраняются как изображения, по умолчанию если не указано расширение файла, то в PNG формате, объекты plotly сохраняются в JSON формате.

Пример:

#!/usr/bin/env python

# Импорт библиотеки
from rndflow import job

from numpy import linspace, pi

# Загрузка переменных входного пакета в качестве глобальных переменных.
globals().update(job.load())

print(f'{size = }, {span = }')

x = linspace(0, span * pi, size)

# Сохранение выходного пакета. Переменная x будет сохранена как HDF5 файл.
job.save_package(
    label='x',
    fields=dict(
        size=size,
        span=span,
        job_id=job.job_id
        ),
    files=dict(
        x=x
        )
    )

API доступ к проекту

Для программного доступа к проекту используются API ключи.

Python

Низкоуровневый пример

Импорт библиотеки:

from rndflow import job
from rndflow.server import Server

Использование:

#!/usr/bin/env python
from rndflow import job
from rndflow.server import Server
from datetime import datetime, timedelta
from time import sleep

# Load packages variables as global variables
globals().update(job.load())

server = Server(api_key=job.secret('server_token'))

# Get secrets
project = job.secret('server_project')
input_node = job.secret('server_input')
output_node = job.secret('server_output')

# Get last data layer from the server
layer = server.get(f'/projects/{project}/data_layers/last')

# Post new package to the start node
p = server.post(f'/projects/{project}/nodes/{input_node}/packages',
                params=dict(
                    data_layer_id=layer['id'],
                    ),
                json=dict(
                    label='from_client',
                    fields=dict(size=size, span=span)))

print('Sent workload to the server')

#---------------------------------------------------------------------------
# Poll server to get the resulting package.
# This allows to exit on timeout.
#---------------------------------------------------------------------------
print('Waiting for the results...')
timeout = datetime.now() + timedelta(minutes=5)

ready = False
while not ready and datetime.now() < timeout:
    page = server.post(f'/projects/{project}/nodes/{output_node}/packages/search',
        params=dict(
            data_layer_id=layer['id'],
            page_size=1
            ),
        json=dict(
            master_id=p['id']
            ))
    for result in page['items']:
        ready = True
        break
    else:
        sleep(5)

if not ready:
    raise Exception('Timeout!')

print('Done')

files = server.get(f'/projects/{project}/nodes/{output_node}/packages/{result["id"]}/files')

def download_file(f):
    return lambda path: server.download(f, path)

# Save output package
job.save_package(
    label='result',
    fields={f['name'] : f['value'] for f in result['fields']},
    files={f['name']: download_file(f) for f in files}
    )

Примеры использования обертки ServerProxy

Существует возможность использования вспомогательного класса-обертки ServerProxyopen in new window.

ServerProxy предлагает к использованию следующие методы:

  • ServerProxy - инициациализация объекта класса обертки.

    Параметры:

    • api_key (str): API key
    • project (int): Project-server ID
    • input_node (int): Input node ID of project-server
    • output_node (int): Output node ID of project-server
    • api_server (str, optional): API server URL. Defaults to None.
  • getServer - получение объекта класса обертки с использованием значений секретов.

    Параметры:

    • prefix (str): API key secrets common prefix name
    • api_server (str,optional): API server URL. Defaults to None.
  • getLastDataLayer - получить идентификатор последнего доступного слоя данных.

  • getDataLayers - получить список доступных объектов слоев данных.

  • postPackage - передать данные в входной узел проекта и получить идентификатор созданного пакета.

    Параметры:

    • layer (int): data layerd ID
    • label (str): package label
    • fields (dict): package fields
  • postPackage - передать данные в входной узел проекта и получить идентификатор созданного пакета.

    Параметры:

    • layer (int): data layerd ID
    • package (dict): package
  • searchByMaster - запросить результат обработки из выходного узла проекта, соответствующий переданным ранее в входной узел данным.

    Параметры:

    • layer (int): data layerd ID
    • master (int): master package id
    • page (int): page number, defaults to 1.
    • page_size (int): packages count on page, defaults to 1.

    Возвращаемый результат - словарь с полями total - общее количество пакетов, items - список пакетов (количество зависит от общего количества пакетов и параметров page и page_size).

  • waitResult - вернуть пакеты из выходного узла проекта, соответствующие переданным ранее в входной узел данным; возвращается список пакетов и их общее количество или выбрасывает Timeout исключение.

    Параметры:

    • layer (int): data layerd ID
    • master (int): master package id
    • timeout (timedelta, optional): Timeout. Defaults to timedelta(minutes=5).
    • retry_pause (int, optional): Pause between requests to output node. Defaults to 5.
    • page (int): page number, defaults to 1.
    • page_size (int): packages count on page, defaults to 10.
  • waitOneResult - вернуть первый пакет из выходного узла проекта, соответствующий переданному ранее в входной узел данным; возвращает идентификатор пакета и его поля или выбрасывает Timeout исключение.

    Параметры:

    • layer (int): data layerd ID
    • master (int): master package id
    • timeout (timedelta, optional): Timeout. Defaults to timedelta(minutes=5).
    • retry_pause (int, optional): Pause between requests to output node. Defaults to 5.
  • getFilesList - возвращает список файлов для пакета с указанным идентификатором.

    Параметры:

    • ident (int): package ID
  • waitOneResultAndFiles - вернуть первый пакет из выходного узла проекта, соответствующий переданным ранее в входной узел данным; возвращает идентификатор пакета, его поля и файлы или выбрасывает Timeout исключение.

    Параметры:

    • layer (int): data layerd ID
    • master (int): master package id
    • timeout (timedelta, optional): Timeout. Defaults to timedelta(minutes=5).
    • retry_pause (int, optional): Pause between requests to output node. Defaults to 5.

Импорт библиотеки:

from rndflow import job
from rndflow.server import ServerProxy

Пример: Отправка и получение результата с использованием searchByMaster:

#!/usr/bin/env python
from rndflow import job
from rndflow.server import ServerProxy
from datetime import datetime, timedelta
from time import sleep

globals().update(job.load())

api_key=job.secret('server_token')
project = job.secret('server_project')
input_node = job.secret('server_input')
output_node = job.secret('server_output')

server = ServerProxy(api_key, project, input_node, output_node)

# Get last data layer from the server
layer = server.getLastDataLayer()

# Post new package to the start node
json=dict(label='from_client', fields=dict(size=size, span=span))
p = server.postPackage(layer, json)

print('Sent workload to the server')

#---------------------------------------------------------------------------
# Poll server to get the resulting package.
# This allows to exit on timeout.
#---------------------------------------------------------------------------
print('Waiting for the results...')
timeout = datetime.now() + timedelta(minutes=5)

ready = False
while not ready and datetime.now() < timeout:

    page = server.searchByMaster(layer, p)

    for result in page['items']:
        ready = True
        break
    else:
        sleep(5)

if not ready:
    raise Exception('Timeout!')

print('Done')

files = server.getFilesList(result['id'])

def download_file(f):
    return lambda path: server.download(f, path)

job.save_package(
    label='result',
    fields={f['name'] : f['value'] for f in result['fields']},
    files={f['name']: download_file(f) for f in files}
    )

Пример: Отправка и получение результата с использованием waitOneResult:

#!/usr/bin/env python
from rndflow import job
from rndflow.server import ServerProxy
from datetime import timedelta

globals().update(job.load())

api_key=job.secret('server_token')
project = job.secret('server_project')
input_node = job.secret('server_input')
output_node = job.secret('server_output')

server = ServerProxy(api_key, project, input_node, output_node)

# Get last data layer from the server
layer = server.getLastDataLayer()

# Post new package to the start node
json=dict(label='from_client', fields=dict(size=size, span=span))
p = server.postPackage(layer, json)

print('Sent workload to the server')

#---------------------------------------------------------------------------
# Poll server to get the resulting package.
# This allows to exit on timeout.
#---------------------------------------------------------------------------
print('Waiting for the results...')
rid, rfields = server.waitOneResult(layer, p)

print('Done')

files = server.getFilesList(rid)

def download_file(f):
    return lambda path: server.download(f, path)

job.save_package(
    label='result',
    fields={f['name'] : f['value'] for f in rfields},
    files={f['name']: download_file(f) for f in files}
    )

Пример: Отправка и получение результата с использованием getServer и waitOneResultAndFiles:

#!/usr/bin/env python
from rndflow import job
from rndflow.server import ServerProxy
from datetime import datetime, timedelta
from time import sleep

globals().update(job.load())

# Create Server proxy object
server=ServerProxy.getServer('server')

# Get last data layer from the server
layer = server.getLastDataLayer()

# Post new package to the start node
json=dict(label='from_client', fields=dict(size=size, span=span))
p = server.postPackage(layer, json)

print('Sent workload to the server')

#---------------------------------------------------------------------------
# Poll server to get the resulting package.
# This allows to exit on timeout.
#---------------------------------------------------------------------------
print('Waiting for the results...')
rid, rfields, rfiles = server.waitOneResultAndFiles(layer, p, timedelta(minutes=2))

def download_file(f):
    return lambda path: server.download(f, path)

job.save_package(
    label='result',
    fields={f['name'] : f['value'] for f in rfields},
    files={f['name']: download_file(f) for f in rfiles}
    )

Пример: Получение нескольких результирующих пакетов

#!/usr/bin/env python
from rndflow import job
from rndflow.server import ServerProxy
from datetime import timedelta

globals().update(job.load())

# Create Server proxy object
server=ServerProxy.getServer('server')

# Get last data layer from the server
layer = server.getLastDataLayer()

# Post new package to the start node
json=dict(label='from_client', fields=dict(size=size, span=span))
p = server.postPackage(layer, json)

print('Sent workload to the server')

#---------------------------------------------------------------------------
# Poll server to get the resulting package.
# This allows to exit on timeout.
#---------------------------------------------------------------------------
print('Waiting for the results...')
count = 0
while (count < 2):
    items, _ = server.waitResult(layer, p, timedelta(minutes=2), 5)
    count = len(items)

def download_file(f):
    return lambda path: server.download(f, path)

for item in items:

    rfields = item['fields']
    rfiles = server.getFilesList(item['id'])

    job.save_package(
        label='result_' + item['label'],
        fields={f['name'] : f['value'] for f in rfields},
        files={f['name']: download_file(f) for f in rfiles}
    )

Пример: Получение всех пакетов из выходного узла.

#!/usr/bin/env python
from rndflow import job
from rndflow.server import ServerProxy
from datetime import timedelta

globals().update(job.load())

# Create Server proxy object
server=ServerProxy.getServer('server')

# Get last data layer from the server
layer = server.getLastDataLayer()

print('Get packages from output node...')
items, total = server.waitResult(layer, None, timedelta(minutes=2), 5)

print(f"Packages count: {total}")
print(f"Packages received: {len(items)}")

def download_file(f):
 return lambda path: server.download(f, path)

for item in items:

    rfields = item['fields']
    rfiles = server.getFilesList(item['id'])

    job.save_package(
        label='result_' + item['label'],
        fields={f['name'] : f['value'] for f in rfields},
        files={f['name']: download_file(f) for f in rfiles}
    )