Skip to content

Luna Common

adapters

FileWriteAdatper

Bases: WriteAdapter

Source code in src/luna/common/adapters.py
class FileWriteAdatper(WriteAdapter):
    def __init__(self, store_url, bucket):
        """Return a WriteAdapter for a given file I/O scheme and URL

        Args:
            store_url (str): root URL for the storage location (e.g. s3://localhost:9000 or file:///data)
            bucket (str): the "bucket" or "parent folder" for the storage location
        """
        url_result = urlparse(store_url)

        # All we need is the base path
        self.store_path = url_result.path
        self.bucket = bucket

        # Define base URL
        self.base_url = f"file://{Path(self.store_path)}"
        print("Configured FileAdatper with base URL=" + self.base_url)

        print("Available buckets:")
        for x in os.listdir(self.store_path):
            print(f" - {x}")

    def write(self, input_data, prefix) -> dict:
        """Perform write operation to a posix file system

        Will not perform write if :
            the content length matches (full copy) and the input modification time is earlier than the ingest time (with a 1 min. grace period)

        Args:
            input_data (str): path to input file
            prefix (str): relative path prefix for destination
        Returns:
            dict: key-value pairs containing metadata about the write operation
        """
        input_path = Path(input_data)
        prefix_path = Path(prefix)
        filename = input_path.name

        if not os.path.exists(input_path):
            return {}

        if prefix_path.is_absolute():
            raise RuntimeError(
                "Prefix paths must be relative!"
            )  # User needs to know prefixes are relative paths

        output_data_url = os.path.join(self.base_url, self.bucket, prefix, filename)

        output_dir = os.path.join(self.store_path, self.bucket, prefix)
        output_data = os.path.join(output_dir, filename)
        os.makedirs(output_dir, exist_ok=True)

        input_size, input_mtime = get_file_stats(input_data)
        output_size, output_mtime = get_file_stats(output_data)

        needs_write = (
            output_size != input_size or (input_mtime - 60) > output_mtime
        )  # 60 second grace period

        if needs_write:
            shutil.copy(input_data, output_data)

        output_size, output_mtime = get_file_stats(output_data)

        if not output_size > 0:
            return {"readable": False}

        return {
            "readable": True,
            "data_url": output_data_url,
            "size": output_size,
            "ingest_time": datetime.fromtimestamp(output_mtime),
        }

__init__(store_url, bucket)

Return a WriteAdapter for a given file I/O scheme and URL

Parameters:

Name Type Description Default
store_url str

root URL for the storage location (e.g. s3://localhost:9000 or file:///data)

required
bucket str

the "bucket" or "parent folder" for the storage location

required
Source code in src/luna/common/adapters.py
def __init__(self, store_url, bucket):
    """Return a WriteAdapter for a given file I/O scheme and URL

    Args:
        store_url (str): root URL for the storage location (e.g. s3://localhost:9000 or file:///data)
        bucket (str): the "bucket" or "parent folder" for the storage location
    """
    url_result = urlparse(store_url)

    # All we need is the base path
    self.store_path = url_result.path
    self.bucket = bucket

    # Define base URL
    self.base_url = f"file://{Path(self.store_path)}"
    print("Configured FileAdatper with base URL=" + self.base_url)

    print("Available buckets:")
    for x in os.listdir(self.store_path):
        print(f" - {x}")

write(input_data, prefix)

Perform write operation to a posix file system

Will not perform write if

the content length matches (full copy) and the input modification time is earlier than the ingest time (with a 1 min. grace period)

Parameters:

Name Type Description Default
input_data str

path to input file

required
prefix str

relative path prefix for destination

required

Returns: dict: key-value pairs containing metadata about the write operation

Source code in src/luna/common/adapters.py
def write(self, input_data, prefix) -> dict:
    """Perform write operation to a posix file system

    Will not perform write if :
        the content length matches (full copy) and the input modification time is earlier than the ingest time (with a 1 min. grace period)

    Args:
        input_data (str): path to input file
        prefix (str): relative path prefix for destination
    Returns:
        dict: key-value pairs containing metadata about the write operation
    """
    input_path = Path(input_data)
    prefix_path = Path(prefix)
    filename = input_path.name

    if not os.path.exists(input_path):
        return {}

    if prefix_path.is_absolute():
        raise RuntimeError(
            "Prefix paths must be relative!"
        )  # User needs to know prefixes are relative paths

    output_data_url = os.path.join(self.base_url, self.bucket, prefix, filename)

    output_dir = os.path.join(self.store_path, self.bucket, prefix)
    output_data = os.path.join(output_dir, filename)
    os.makedirs(output_dir, exist_ok=True)

    input_size, input_mtime = get_file_stats(input_data)
    output_size, output_mtime = get_file_stats(output_data)

    needs_write = (
        output_size != input_size or (input_mtime - 60) > output_mtime
    )  # 60 second grace period

    if needs_write:
        shutil.copy(input_data, output_data)

    output_size, output_mtime = get_file_stats(output_data)

    if not output_size > 0:
        return {"readable": False}

    return {
        "readable": True,
        "data_url": output_data_url,
        "size": output_size,
        "ingest_time": datetime.fromtimestamp(output_mtime),
    }

IOAdapter

Interface for IO

Exposes a write and read method via scheme specific classes:

IOAdapter.writer().write() -> url

IOAdapter.reader().read() -> data (TODO)

Source code in src/luna/common/adapters.py
class IOAdapter:
    """Interface for IO

    Exposes a write and read method via scheme specific classes:

    IOAdapter.writer(<scheme>).write(<data>) -> url

    IOAdapter.reader(<scheme>).read(<url>)   -> data (TODO)
    """

    def __init__(self, no_write=False):
        self.no_write = no_write

    def writer(self, store_url, bucket):
        """Return a WriteAdapter for a given file I/O scheme and URL

        Args:
            store_url (str): root URL for the storage location (e.g. s3://localhost:9000 or file:///data)
            bucket (str): the "bucket" or "parent folder" for the storage location

        Returns
            WriteAdapter: object capable of writing to the location at store_url

        """
        if self.no_write:
            return NoWriteAdapter()

        url_result = urlparse(store_url)

        if url_result.scheme == "file":
            return FileWriteAdatper(store_url=store_url, bucket=bucket)
        elif url_result.scheme == "s3":
            return MinioWriteAdatper(store_url=store_url, bucket=bucket)
        elif url_result.scheme == "s3+https":
            return MinioWriteAdatper(store_url=store_url, bucket=bucket, secure=True)
        else:
            raise RuntimeError(
                "Unsupported slide store schemes, please try s3:// or file://"
            )

writer(store_url, bucket)

Return a WriteAdapter for a given file I/O scheme and URL

Parameters:

Name Type Description Default
store_url str

root URL for the storage location (e.g. s3://localhost:9000 or file:///data)

required
bucket str

the "bucket" or "parent folder" for the storage location

required

Returns WriteAdapter: object capable of writing to the location at store_url

Source code in src/luna/common/adapters.py
def writer(self, store_url, bucket):
    """Return a WriteAdapter for a given file I/O scheme and URL

    Args:
        store_url (str): root URL for the storage location (e.g. s3://localhost:9000 or file:///data)
        bucket (str): the "bucket" or "parent folder" for the storage location

    Returns
        WriteAdapter: object capable of writing to the location at store_url

    """
    if self.no_write:
        return NoWriteAdapter()

    url_result = urlparse(store_url)

    if url_result.scheme == "file":
        return FileWriteAdatper(store_url=store_url, bucket=bucket)
    elif url_result.scheme == "s3":
        return MinioWriteAdatper(store_url=store_url, bucket=bucket)
    elif url_result.scheme == "s3+https":
        return MinioWriteAdatper(store_url=store_url, bucket=bucket, secure=True)
    else:
        raise RuntimeError(
            "Unsupported slide store schemes, please try s3:// or file://"
        )

MinioWriteAdatper

Bases: WriteAdapter

Source code in src/luna/common/adapters.py
class MinioWriteAdatper(WriteAdapter):
    def __init__(self, store_url, bucket, secure=False):
        """Return a WriteAdapter for a given file I/O scheme and URL

        Args:
            store_url (str): root URL for the storage location (e.g. s3://localhost:9000 or file:///data)
            bucket (str): the "bucket" or "parent folder" for the storage location
        """

        url_result = urlparse(store_url)

        # We need a bit more detail here
        self.store_hostname = url_result.hostname
        self.store_port = url_result.port
        self.bucket = bucket
        self.secure = secure

        self.client_init = partial(
            Minio,
            f"{self.store_hostname}:{self.store_port}",
            access_key=os.environ["MINIO_USER"],
            secret_key=os.environ["MINIO_PASSWORD"],
            secure=secure,
        )

        self.base_url = os.path.join(f"s3://{self.store_hostname}:{self.store_port}")
        print("Configured MinioAdatper with base URL=" + self.base_url)

        client = self.client_init()

        print("Available buckets:")
        for x in client.list_buckets():
            print(f" - {x.name}")

    def write(self, input_data, prefix) -> dict:
        """Perform write operation to a s3 file system

        Will not perform write if :
            the content length matches (full copy) and the input modification time is earlier than the ingest time (with a 1 min. grace period)

        Args:
            input_data (str): path to input file
            prefix (str): relative path prefix for destination
        Returns:
            dict: key-value pairs containing metadata about the write operation
        """
        input_path = Path(input_data)
        prefix_path = Path(prefix)
        filename = input_path.name

        if not os.path.exists(input_path):
            return {}

        if prefix_path.is_absolute():
            raise RuntimeError(
                "Prefix paths must be relative!"
            )  # User needs to know prefixes are relative paths

        output_data_url = os.path.join(self.base_url, self.bucket, prefix, filename)

        client = self.client_init()

        object_size, object_mtime = get_object_stats(
            client, self.bucket, f"{prefix_path}/{filename}"
        )
        input_size, input_mtime = get_file_stats(input_data)
        needs_write = (
            object_size != input_size or (input_mtime - 60) > object_mtime
        )  # 60 second grace period

        if needs_write:
            client.fput_object(
                self.bucket,
                f"{prefix_path}/{filename}",
                input_path,
                part_size=250000000,
            )

        object_size, object_mtime = get_object_stats(
            client, self.bucket, f"{prefix_path}/{filename}"
        )

        if not object_size > 0:
            return {"readable": False}

        return {
            "readable": True,
            "data_url": output_data_url,
            "size": object_size,
            "ingest_time": datetime.fromtimestamp(object_mtime),
        }

__init__(store_url, bucket, secure=False)

Return a WriteAdapter for a given file I/O scheme and URL

Parameters:

Name Type Description Default
store_url str

root URL for the storage location (e.g. s3://localhost:9000 or file:///data)

required
bucket str

the "bucket" or "parent folder" for the storage location

required
Source code in src/luna/common/adapters.py
def __init__(self, store_url, bucket, secure=False):
    """Return a WriteAdapter for a given file I/O scheme and URL

    Args:
        store_url (str): root URL for the storage location (e.g. s3://localhost:9000 or file:///data)
        bucket (str): the "bucket" or "parent folder" for the storage location
    """

    url_result = urlparse(store_url)

    # We need a bit more detail here
    self.store_hostname = url_result.hostname
    self.store_port = url_result.port
    self.bucket = bucket
    self.secure = secure

    self.client_init = partial(
        Minio,
        f"{self.store_hostname}:{self.store_port}",
        access_key=os.environ["MINIO_USER"],
        secret_key=os.environ["MINIO_PASSWORD"],
        secure=secure,
    )

    self.base_url = os.path.join(f"s3://{self.store_hostname}:{self.store_port}")
    print("Configured MinioAdatper with base URL=" + self.base_url)

    client = self.client_init()

    print("Available buckets:")
    for x in client.list_buckets():
        print(f" - {x.name}")

write(input_data, prefix)

Perform write operation to a s3 file system

Will not perform write if

the content length matches (full copy) and the input modification time is earlier than the ingest time (with a 1 min. grace period)

Parameters:

Name Type Description Default
input_data str

path to input file

required
prefix str

relative path prefix for destination

required

Returns: dict: key-value pairs containing metadata about the write operation

Source code in src/luna/common/adapters.py
def write(self, input_data, prefix) -> dict:
    """Perform write operation to a s3 file system

    Will not perform write if :
        the content length matches (full copy) and the input modification time is earlier than the ingest time (with a 1 min. grace period)

    Args:
        input_data (str): path to input file
        prefix (str): relative path prefix for destination
    Returns:
        dict: key-value pairs containing metadata about the write operation
    """
    input_path = Path(input_data)
    prefix_path = Path(prefix)
    filename = input_path.name

    if not os.path.exists(input_path):
        return {}

    if prefix_path.is_absolute():
        raise RuntimeError(
            "Prefix paths must be relative!"
        )  # User needs to know prefixes are relative paths

    output_data_url = os.path.join(self.base_url, self.bucket, prefix, filename)

    client = self.client_init()

    object_size, object_mtime = get_object_stats(
        client, self.bucket, f"{prefix_path}/{filename}"
    )
    input_size, input_mtime = get_file_stats(input_data)
    needs_write = (
        object_size != input_size or (input_mtime - 60) > object_mtime
    )  # 60 second grace period

    if needs_write:
        client.fput_object(
            self.bucket,
            f"{prefix_path}/{filename}",
            input_path,
            part_size=250000000,
        )

    object_size, object_mtime = get_object_stats(
        client, self.bucket, f"{prefix_path}/{filename}"
    )

    if not object_size > 0:
        return {"readable": False}

    return {
        "readable": True,
        "data_url": output_data_url,
        "size": object_size,
        "ingest_time": datetime.fromtimestamp(object_mtime),
    }

NoWriteAdapter

Bases: WriteAdapter

Source code in src/luna/common/adapters.py
class NoWriteAdapter(WriteAdapter):
    def __init__(self):
        self.base_url = "file://"
        print("Configured NoWriteAdatper with base URL=" + self.base_url)

    def write(self, input_data, prefix) -> dict:
        """Returns input_data as written data

        Args:
            input_data (str): path to input file
            prefix (str): relative path prefix for destination, ignored
        Returns:
            dict: key-value pairs containing metadata about the write operation
        """
        input_path = Path(input_data)
        # filename = input_path.name

        if not os.path.exists(input_path):
            return {}

        output_data_url = f"{self.base_url}{input_path}"

        input_size, input_mtime = get_file_stats(input_data)

        return {
            "readable": True,
            "data_url": output_data_url,
            "size": input_size,
            "ingest_time": datetime.fromtimestamp(input_mtime),
        }

write(input_data, prefix)

Returns input_data as written data

Parameters:

Name Type Description Default
input_data str

path to input file

required
prefix str

relative path prefix for destination, ignored

required

Returns: dict: key-value pairs containing metadata about the write operation

Source code in src/luna/common/adapters.py
def write(self, input_data, prefix) -> dict:
    """Returns input_data as written data

    Args:
        input_data (str): path to input file
        prefix (str): relative path prefix for destination, ignored
    Returns:
        dict: key-value pairs containing metadata about the write operation
    """
    input_path = Path(input_data)
    # filename = input_path.name

    if not os.path.exists(input_path):
        return {}

    output_data_url = f"{self.base_url}{input_path}"

    input_size, input_mtime = get_file_stats(input_data)

    return {
        "readable": True,
        "data_url": output_data_url,
        "size": input_size,
        "ingest_time": datetime.fromtimestamp(input_mtime),
    }

config

Created on October 17, 2019

@author: pashaa@mskcc.org

ConfigSet

This is a singleton class that can load a collection of configurations from yaml files.

ConfigSet loads configurations from yaml files only once on first invocation of this class with the specified yaml file. The class then maintains the configuration in memory in a singleton instance. All new invocations of this class will serve up the same configuration.

Each configuration in the collection is identified by a logical name.

If a new invocation of this class is created with an existing logical name and a different yaml file, the singleton instance replaces the existing configuration with the newly specified yaml file for the given logical name.

Source code in src/luna/common/config.py
class ConfigSet:
    """
    This is a singleton class that can load a collection of configurations from yaml files.

    ConfigSet loads configurations from yaml files only once on first invocation of this class with
    the specified yaml file. The class then maintains the configuration in memory in a singleton instance. All new
    invocations of this class will serve up the same configuration.

    Each configuration in the collection is identified by a logical name.

    If a new invocation of this class is created with an existing logical name and a different yaml file, the singleton
    instance replaces the existing configuration with the newly specified yaml file for the given logical name.
    """

    __CONFIG_MAP = {}  # maps logical name to yaml config file name
    __SCHEMA_MAP = {}  # maps logical name to schema file of yaml config file
    __INSTANCE = None  # singleton instance containing the collection of configs keyed by logical name

    def __new__(cls, name=None, config_file=None, schema_file=None):
        # assume one or more collections have already been loaded
        if name is None or config_file is None:
            return ConfigSet.__INSTANCE

        # initialize singleton
        if ConfigSet.__INSTANCE is None:
            ConfigSet.__INSTANCE = object.__new__(cls)
            ConfigSet.__INSTANCE.__config = {}

        # load or reload config into memory
        if (
            name not in ConfigSet.__CONFIG_MAP.keys()
            or ConfigSet.__CONFIG_MAP[name] != config_file
        ):
            ConfigSet.__CONFIG_MAP[name] = config_file
            ConfigSet.__INSTANCE.__config[name] = ConfigSet._load_config(cls, name)

            # add schema and validate config
            if schema_file is not None:
                ConfigSet.__SCHEMA_MAP[name] = schema_file
                ConfigSet._validate_config(cls, name)

        return ConfigSet.__INSTANCE

    def __init__(self, name=None, config_file=None, schema_file=None):
        """
        :param name logical name to be given for this configuration. This argument only needs to be provided on first
                    invocation (optional).
        :param config_file the config file to load. This argument only needs to be provided on first
                           invocation (optional).
        :param schema_file a schema file for the yaml configuration (optional)
        :raises yamale.yamale_error.YamaleError if config file is invalid when validated against the schema
        """
        pass  # see __new__() method implementation

    def _validate_config(cls, name):
        config_file = ConfigSet.__CONFIG_MAP[name]
        schema_file = ConfigSet.__SCHEMA_MAP[name]
        logger.info(
            "validating config "
            + config_file
            + " against schema "
            + schema_file
            + " for "
            + name
        )
        schema = yamale.make_schema(schema_file)
        data = yamale.make_data(config_file)
        yamale.validate(schema, data)

    def _load_config(cls, name):
        """

        :param name: logical name of the config to load
        :return: config generator object

        :raises: IOError if yaml config file for the specified logical name cannot be found
        """
        # read config file
        config_file = ConfigSet.__CONFIG_MAP[name]
        logger.info("loading config file " + config_file)

        try:
            stream = open(config_file, "r")
        except IOError as err:
            logger.error(
                "unable to find a config file with name "
                + config_file
                + ". Please use config.yaml.template to make a "
                + config_file
                + ". "
                + str(err)
            )
            raise err

        config = {}
        for items in yaml.load_all(stream, Loader=yaml.FullLoader):
            config.update(items)

        return config

    def _parse_path(self, path):
        path_segments = path.split("::", 1)  # split just once

        if len(path_segments) != 2:
            err = (
                "Illegal config path: " + path + '. must be of form "name::jsonpath" '
                "where name is the logical name of the configuration and jsonpath is the "
                "jsonpath into the yaml configuration"
            )
            logger.error(err)
            raise ValueError(err)

        return {"name": path_segments[0], "jsonpath": path_segments[1]}

    def _get_match(self, name, jsonpath):
        jsonpath_expression = parse(jsonpath)

        return jsonpath_expression.find(ConfigSet.__INSTANCE.__config[name])

    def has_value(self, path):
        """
        Args:
            path (str): path to a value in a configuration. The path must be of the form
            "name::jsonpath" where name is the logical name of the configuration and jsonpath is the jsonpath to value.
            see config.yaml to generate a jsonpath. See https://pypi.org/project/jsonpath-ng/ jsonpath expressions
            may be tested here - https://jsonpath.com/

        Returns:
            boolean: true if value is not an empty string, else false.

        Raises:
            ValueError: if a configuration with the specified name was never loaded

        """
        parsed = self._parse_path(path)
        name = parsed["name"]
        jsonpath = parsed["jsonpath"]

        if (
            ConfigSet.__INSTANCE is None
            or name not in ConfigSet.__INSTANCE.__config.keys()
        ):
            raise ValueError(
                "configuration with logical name " + name + " was never loaded"
            )

        if len(self._get_match(name, jsonpath)) == 0:
            return False
        else:
            return True

    def get_value(self, path):
        """
        Gets the value for the specified jsonpath from the specified configuration.

        Args:
            path (str): path to a value in a configuration. The path must be of the form "name::jsonpath"
            where name is the logical name of the configuration and jsonpath is the jsonpath to value.
            see config.yaml to generate a jsonpath. See https://pypi.org/project/jsonpath-ng/
            jsonpath expressions may be tested here - https://jsonpath.com/

        Returns:
            str: value from config file

        Raises:
            ValueError: if no match is found for the specified exception or a configuration with
            the specified name was never loaded

        """
        parsed = self._parse_path(path)
        name = parsed["name"]
        jsonpath = parsed["jsonpath"]

        if (
            ConfigSet.__INSTANCE is None
            or name not in ConfigSet.__INSTANCE.__config.keys()
        ):
            raise ValueError(
                "configuration with logical name " + name + " was never loaded"
            )

        match = self._get_match(name, jsonpath)

        if len(match) == 0:
            err = "unable to find a config value for jsonpath: " + jsonpath
            logger.error(err)
            raise ValueError(err)

        return match[0].value

    def get_names(self):
        """

        :return: a list of logical names of the configs stored in this instance.
        """
        if ConfigSet.__INSTANCE is not None:
            return list(ConfigSet.__INSTANCE.__config.keys())
        else:
            return []

    def get_keys(self, name):
        """

        :param name: logical name of the configuration
        :return: a list of top-level keys in the config stored in this instance.
        :raises: ValueError if a configuration with the specified name was never loaded
        """
        if (
            ConfigSet.__INSTANCE is None
            or name not in ConfigSet.__INSTANCE.__config.keys()
        ):
            raise ValueError(
                "configuration with logical name " + name + " was never loaded"
            )

        return list(ConfigSet.__INSTANCE.__config[name].keys())

    def get_config_set(self, name):
        """

        :param name: logical name of the configuration
        :return: a dictonary of top-level keys in the config stored in this instance.
        :raises: ValueError if a configuration with the specified name was never loaded

        """
        if (
            ConfigSet.__INSTANCE is None
            or name not in ConfigSet.__INSTANCE.__config.keys()
        ):
            raise ValueError(
                "configuration with logical name " + name + " was never loaded"
            )
        return ConfigSet.__INSTANCE.__config[name]

    def clear(self):
        """
        clear the entire collection of configurations
        """
        ConfigSet.__CONFIG_MAP = {}
        ConfigSet.__SCHEMA_MAP = {}
        ConfigSet.__INSTANCE = None

__init__(name=None, config_file=None, schema_file=None)

:param name logical name to be given for this configuration. This argument only needs to be provided on first invocation (optional). :param config_file the config file to load. This argument only needs to be provided on first invocation (optional). :param schema_file a schema file for the yaml configuration (optional) :raises yamale.yamale_error.YamaleError if config file is invalid when validated against the schema

Source code in src/luna/common/config.py
def __init__(self, name=None, config_file=None, schema_file=None):
    """
    :param name logical name to be given for this configuration. This argument only needs to be provided on first
                invocation (optional).
    :param config_file the config file to load. This argument only needs to be provided on first
                       invocation (optional).
    :param schema_file a schema file for the yaml configuration (optional)
    :raises yamale.yamale_error.YamaleError if config file is invalid when validated against the schema
    """
    pass  # see __new__() method implementation

clear()

clear the entire collection of configurations

Source code in src/luna/common/config.py
def clear(self):
    """
    clear the entire collection of configurations
    """
    ConfigSet.__CONFIG_MAP = {}
    ConfigSet.__SCHEMA_MAP = {}
    ConfigSet.__INSTANCE = None

get_config_set(name)

:param name: logical name of the configuration :return: a dictonary of top-level keys in the config stored in this instance. :raises: ValueError if a configuration with the specified name was never loaded

Source code in src/luna/common/config.py
def get_config_set(self, name):
    """

    :param name: logical name of the configuration
    :return: a dictonary of top-level keys in the config stored in this instance.
    :raises: ValueError if a configuration with the specified name was never loaded

    """
    if (
        ConfigSet.__INSTANCE is None
        or name not in ConfigSet.__INSTANCE.__config.keys()
    ):
        raise ValueError(
            "configuration with logical name " + name + " was never loaded"
        )
    return ConfigSet.__INSTANCE.__config[name]

get_keys(name)

:param name: logical name of the configuration :return: a list of top-level keys in the config stored in this instance. :raises: ValueError if a configuration with the specified name was never loaded

Source code in src/luna/common/config.py
def get_keys(self, name):
    """

    :param name: logical name of the configuration
    :return: a list of top-level keys in the config stored in this instance.
    :raises: ValueError if a configuration with the specified name was never loaded
    """
    if (
        ConfigSet.__INSTANCE is None
        or name not in ConfigSet.__INSTANCE.__config.keys()
    ):
        raise ValueError(
            "configuration with logical name " + name + " was never loaded"
        )

    return list(ConfigSet.__INSTANCE.__config[name].keys())

get_names()

:return: a list of logical names of the configs stored in this instance.

Source code in src/luna/common/config.py
def get_names(self):
    """

    :return: a list of logical names of the configs stored in this instance.
    """
    if ConfigSet.__INSTANCE is not None:
        return list(ConfigSet.__INSTANCE.__config.keys())
    else:
        return []

get_value(path)

Gets the value for the specified jsonpath from the specified configuration.

Parameters:

Name Type Description Default
path str

path to a value in a configuration. The path must be of the form "name::jsonpath"

required
see config.yaml to generate a jsonpath. See https

//pypi.org/project/jsonpath-ng/

required
jsonpath expressions may be tested here - https

//jsonpath.com/

required

Returns:

Name Type Description
str

value from config file

Raises:

Type Description
ValueError

if no match is found for the specified exception or a configuration with

Source code in src/luna/common/config.py
def get_value(self, path):
    """
    Gets the value for the specified jsonpath from the specified configuration.

    Args:
        path (str): path to a value in a configuration. The path must be of the form "name::jsonpath"
        where name is the logical name of the configuration and jsonpath is the jsonpath to value.
        see config.yaml to generate a jsonpath. See https://pypi.org/project/jsonpath-ng/
        jsonpath expressions may be tested here - https://jsonpath.com/

    Returns:
        str: value from config file

    Raises:
        ValueError: if no match is found for the specified exception or a configuration with
        the specified name was never loaded

    """
    parsed = self._parse_path(path)
    name = parsed["name"]
    jsonpath = parsed["jsonpath"]

    if (
        ConfigSet.__INSTANCE is None
        or name not in ConfigSet.__INSTANCE.__config.keys()
    ):
        raise ValueError(
            "configuration with logical name " + name + " was never loaded"
        )

    match = self._get_match(name, jsonpath)

    if len(match) == 0:
        err = "unable to find a config value for jsonpath: " + jsonpath
        logger.error(err)
        raise ValueError(err)

    return match[0].value

has_value(path)

Parameters:

Name Type Description Default
path str

path to a value in a configuration. The path must be of the form

required
"name

:jsonpath" where name is the logical name of the configuration and jsonpath is the jsonpath to value.

required
see config.yaml to generate a jsonpath. See https

//pypi.org/project/jsonpath-ng/ jsonpath expressions

required
may be tested here - https

//jsonpath.com/

required

Returns:

Name Type Description
boolean

true if value is not an empty string, else false.

Raises:

Type Description
ValueError

if a configuration with the specified name was never loaded

Source code in src/luna/common/config.py
def has_value(self, path):
    """
    Args:
        path (str): path to a value in a configuration. The path must be of the form
        "name::jsonpath" where name is the logical name of the configuration and jsonpath is the jsonpath to value.
        see config.yaml to generate a jsonpath. See https://pypi.org/project/jsonpath-ng/ jsonpath expressions
        may be tested here - https://jsonpath.com/

    Returns:
        boolean: true if value is not an empty string, else false.

    Raises:
        ValueError: if a configuration with the specified name was never loaded

    """
    parsed = self._parse_path(path)
    name = parsed["name"]
    jsonpath = parsed["jsonpath"]

    if (
        ConfigSet.__INSTANCE is None
        or name not in ConfigSet.__INSTANCE.__config.keys()
    ):
        raise ValueError(
            "configuration with logical name " + name + " was never loaded"
        )

    if len(self._get_match(name, jsonpath)) == 0:
        return False
    else:
        return True

connectors

DremioClientAuthMiddleware

Bases: ClientMiddleware

A ClientMiddleware that extracts the bearer token from the authorization header returned by the Dremio Flight Server Endpoint. Parameters


factory : ClientHeaderAuthMiddlewareFactory The factory to set call credentials if an authorization header with bearer token is returned by the Dremio server.

Source code in src/luna/common/connectors.py
class DremioClientAuthMiddleware(flight.ClientMiddleware):
    """
    A ClientMiddleware that extracts the bearer token from
    the authorization header returned by the Dremio
    Flight Server Endpoint.
    Parameters
    ----------
    factory : ClientHeaderAuthMiddlewareFactory
        The factory to set call credentials if an
        authorization header with bearer token is
        returned by the Dremio server.
    """

    def __init__(self, factory):
        self.factory = factory

    def received_headers(self, headers):
        auth_header_key = "authorization"
        authorization_header = []
        for key in headers:
            if key.lower() == auth_header_key:
                authorization_header = headers.get(auth_header_key)
        self.factory.set_call_credential(
            [b"authorization", authorization_header[0].encode("utf-8")]
        )

DremioClientAuthMiddlewareFactory

Bases: ClientMiddlewareFactory

A factory that creates DremioClientAuthMiddleware(s).

Source code in src/luna/common/connectors.py
class DremioClientAuthMiddlewareFactory(flight.ClientMiddlewareFactory):
    """A factory that creates DremioClientAuthMiddleware(s)."""

    def __init__(self):
        self.call_credential = []

    def start_call(self, info):
        return DremioClientAuthMiddleware(self)

    def set_call_credential(self, call_credential):
        self.call_credential = call_credential

DremioDataframeConnector

A connector that interfaces with a Dremio instance/cluster via Apache Arrow Flight for fast read performance Parameters


scheme: connection scheme hostname: host of main dremio name flightport: which port dremio exposes to flight requests dremio_user: username to use dremio_password: associated password connection_args: anything else to pass to the FlightClient initialization

Source code in src/luna/common/connectors.py
class DremioDataframeConnector:
    """
    A connector that interfaces with a Dremio instance/cluster via Apache Arrow Flight for fast read performance
    Parameters
    ----------
    scheme: connection scheme
    hostname: host of main dremio name
    flightport: which port dremio exposes to flight requests
    dremio_user: username to use
    dremio_password: associated password
    connection_args: anything else to pass to the FlightClient initialization
    """

    def __init__(
        self,
        scheme,
        hostname,
        flightport,
        dremio_user,
        dremio_password,
        connection_args,
    ):
        # Skipping tls...

        # Two WLM settings can be provided upon initial authneitcation
        # with the Dremio Server Flight Endpoint:
        # - routing-tag
        # - routing queue
        initial_options = flight.FlightCallOptions(
            headers=[
                (b"routing-tag", b"test-routing-tag"),
                (b"routing-queue", b"Low Cost User Queries"),
            ]
        )
        client_auth_middleware = DremioClientAuthMiddlewareFactory()
        client = flight.FlightClient(
            f"{scheme}://{hostname}:{flightport}",
            middleware=[client_auth_middleware],
            **connection_args,
        )
        self.bearer_token = client.authenticate_basic_token(
            dremio_user, dremio_password, initial_options
        )
        # print('[INFO] Authentication was successful')
        self.client = client

    def get_table(self, space, table_name):
        """
        Return the virtual table at project(or "space").table_name as a pandas dataframe
        Parameters:
        ----------
        space: Project ID/Space to read from
        table_name:  Table name to load
        """
        sqlquery = f""" SELECT * FROM "{space}"."{table_name}" """
        return self.run_query(sqlquery)

    def run_query(self, sqlquery):
        """
        Return the virtual table at project(or "space").table_name as a pandas dataframe
        Parameters:
        ----------
        project: Project ID to read from
        table_name:  Table name to load
        """
        # Get table from our dicom segments
        # flight_desc = flight.FlightDescriptor.for_command(sqlquery)
        print("[INFO] Query: ", sqlquery)

        options = flight.FlightCallOptions(headers=[self.bearer_token])
        # schema = self.client.get_schema(flight_desc, options)
        # print('[INFO] GetSchema was successful')
        # print('[INFO] Schema: ', schema)

        # Get the FlightInfo message to retrieve the Ticket corresponding
        # to the query result set.
        flight_info = self.client.get_flight_info(
            flight.FlightDescriptor.for_command(sqlquery), options
        )
        # print('[INFO] GetFlightInfo was successful')
        # print('[INFO] Ticket: ', flight_info.endpoints[0].ticket)

        # Retrieve the result set as a stream of Arrow record batches.
        reader = self.client.do_get(flight_info.endpoints[0].ticket, options)
        # print('[INFO] Reading query results from Dremio')
        return reader.read_pandas()

get_table(space, table_name)

Return the virtual table at project(or "space").table_name as a pandas dataframe Parameters:


space: Project ID/Space to read from table_name: Table name to load

Source code in src/luna/common/connectors.py
def get_table(self, space, table_name):
    """
    Return the virtual table at project(or "space").table_name as a pandas dataframe
    Parameters:
    ----------
    space: Project ID/Space to read from
    table_name:  Table name to load
    """
    sqlquery = f""" SELECT * FROM "{space}"."{table_name}" """
    return self.run_query(sqlquery)

run_query(sqlquery)

Return the virtual table at project(or "space").table_name as a pandas dataframe Parameters:


project: Project ID to read from table_name: Table name to load

Source code in src/luna/common/connectors.py
def run_query(self, sqlquery):
    """
    Return the virtual table at project(or "space").table_name as a pandas dataframe
    Parameters:
    ----------
    project: Project ID to read from
    table_name:  Table name to load
    """
    # Get table from our dicom segments
    # flight_desc = flight.FlightDescriptor.for_command(sqlquery)
    print("[INFO] Query: ", sqlquery)

    options = flight.FlightCallOptions(headers=[self.bearer_token])
    # schema = self.client.get_schema(flight_desc, options)
    # print('[INFO] GetSchema was successful')
    # print('[INFO] Schema: ', schema)

    # Get the FlightInfo message to retrieve the Ticket corresponding
    # to the query result set.
    flight_info = self.client.get_flight_info(
        flight.FlightDescriptor.for_command(sqlquery), options
    )
    # print('[INFO] GetFlightInfo was successful')
    # print('[INFO] Ticket: ', flight_info.endpoints[0].ticket)

    # Retrieve the result set as a stream of Arrow record batches.
    reader = self.client.do_get(flight_info.endpoints[0].ticket, options)
    # print('[INFO] Reading query results from Dremio')
    return reader.read_pandas()

dask

configure_dask_client(**kwargs)

Instantiate a Dask client according to the given configuration. This should only be called once in a given program. The client created here can always be retrieved (where needed) using get_or_create_dask_client().

Source code in src/luna/common/dask.py
def configure_dask_client(**kwargs):
    """Instantiate a Dask client according to the given configuration.  This should only
    be called once in a given program.  The client created here can always be retrieved
    (where needed) using get_or_create_dask_client().
    """
    try:
        if kwargs:
            client = Client(**kwargs)
        elif scheduler := os.getenv(DASK_ADDRESS_VAR):
            if not validate_dask_address(scheduler):
                raise ValueError(f"Env var {DASK_ADDRESS_VAR} has illegal value '{scheduler}'")
            client = get_client(scheduler)
        else:
            client = get_client()
    except ValueError as exc:
        logger.warning(f"Error connecting to Dask scheduler.  Reverting to LocalCluster.\n{exc}")
        client = Client(threads_per_worker=1)
    return client

dask_job(job_name)

The simplier version of a dask job decorator, which only provides the worker_client as a runner to the calling function

Examples:

>>> @dask_job('my_job')
>>> my_job(args, kwargs, runner=None):
>>>     runner.submit(sleep, 10)
Source code in src/luna/common/dask.py
def dask_job(job_name):
    """
    The simplier version of a dask job decorator, which only provides the worker_client as a runner to the calling function

    Examples:
        >>> @dask_job('my_job')
        >>> my_job(args, kwargs, runner=None):
        >>>     runner.submit(sleep, 10)
    """

    def wrapped(func):
        def run_simple(namespace, index, *args, **kwargs):
            """
            Only provides runner object to method, no threading
            """
            # Tell us we are running
            logger.info(f"Initializing {job_name} @ {namespace}/{index}")

            # See if we are on a dask worker
            try:
                worker = get_worker()
            except ValueError:
                logger.warning("Could not get dask worker!")
                worker = None
            except Exception:
                logger.exception("Unknown exception when getting dask worker")
                worker = None

            logger.info(f"Successfully found worker {worker}")

            # Jobs get an output directory and an ouput parquet slice
            output_dir = os.path.join(
                os.environ["MIND_GPFS_DIR"], "data_dev", namespace, index
            )
            output_ds = os.path.join(
                os.environ["MIND_GPFS_DIR"],
                "data_dev",
                namespace,
                job_name.upper() + "_DS",
            )
            os.makedirs(output_dir, exist_ok=True)
            os.makedirs(output_ds, exist_ok=True)
            output_segment = os.path.join(output_ds, f"ResultSegment-{index}.parquet")

            logger.info(f"Setup ouput dir={output_dir} slice={output_ds}")

            # Kick off the job
            try:
                logger.info(f"Running job {func} with args: {args}, kwargs: {kwargs}")
                return_value = func(index, output_dir, output_segment, *args, **kwargs)
            except Exception as exc:
                logger.exception(
                    f"Job execution failed due to: {exc}",
                    extra={"namespace": namespace, "key": index},
                )
                raise JobExecutionError(
                    f"Job {func} did not run successfully, please check input data! {args}, {kwargs}"
                )

            return return_value

        run_simple.__name__ = job_name

        return run_simple

    return wrapped

prune_empty_delayed(tasks)

A less-than-ideal method to prune empty tasks from dask tasks Here we're trading CPU and time for memory.

Parameters:

Name Type Description Default
tasks list

list of delayed dask tasks

required

Returns:

Type Description

list[dask.delayed]: a reduced list of delayed dask tasks

Source code in src/luna/common/dask.py
def prune_empty_delayed(tasks):
    """
    A less-than-ideal method to prune empty tasks from dask tasks
    Here we're trading CPU and time for memory.

    Args:
        tasks (list): list of delayed dask tasks

    Returns:
        list[dask.delayed]: a reduced list of delayed dask tasks
    """

    @dask.delayed
    def mock_return(task):
        return task

    if len(tasks) >= LENGTH_MANY_TASKS:
        logger.warning(f"Hope you're okay with a length={len(tasks)} for loop!")

    return [mock_return(task) for task in dask.compute(*tasks) if task is not None]

with_event_loop(func)

This method decorates functions run on dask workers with an async function call Namely, this allows us to manage the execution of a function a bit better, and especially, to exit job execution if things take too long (1hr)

Here, the function func is run in a background thread, and has access to the dask schedular through the 'runner'. Critically, sumbission to this runner/client looks the same regardless of if it occurs in a sub-process/thread

Mostly, this is a workaround to impliment some form of timeout when running very long-tasks on dask. While one cannot (or should not) kill the running thread, Dask will cleanup the child tasks eventually once all jobs finish.

Examples:

>>> @with_dask_event_loop
>>> my_job(args, kwargs, runner=None):
>>>     runner.submit(sleep, 10)
Source code in src/luna/common/dask.py
def with_event_loop(func):
    """
    This method decorates functions run on dask workers with an async function call
    Namely, this allows us to manage the execution of a function a bit better, and especially, to exit job execution if things take too long (1hr)

    Here, the function func is run in a background thread, and has access to the dask schedular through the 'runner'.
    Critically, sumbission to this runner/client looks the same regardless of if it occurs in a sub-process/thread

    Mostly, this is a workaround to impliment some form of timeout when running very long-tasks on dask.
    While one cannot (or should not) kill the running thread, Dask will cleanup the child tasks eventually once all jobs finish.

    Examples:
        >>> @with_dask_event_loop
        >>> my_job(args, kwargs, runner=None):
        >>>     runner.submit(sleep, 10)
    """

    async def wrapped(*args, **kwargs):
        loop = asyncio.get_event_loop()

        # Get our current dask worker, functions wrapped with this method can only be run on dask workers
        logger.info("Initializing job... getting parent worker")
        try:
            worker = get_worker()
        except ValueError:
            logger.error("Could not get dask worker!")
            raise RuntimeError("Data-processing job called without parent dask worker")
        except Exception:
            logger.exception("Unknown exception when getting dask worker")

        logger.info(f"Successfully found worker {worker}")
        logger.info(f"Running job {func} with args: {args}, kwargs: {kwargs}")

        # Get our worker client, and pass as a dask client exector
        with worker_client() as runner:

            # We'll run our function in a background thread
            # executor = ProcessPoolExecutor(max_workers=1)

            # Add our runner to kwargs
            kwargs["runner"] = runner

            # Kick off the job
            job = loop.run_in_executor(worker.executor, partial(func, *args, **kwargs))

            # Move on from job if things take more than hour
            done, pending = await asyncio.wait([job], timeout=3600)

            # Do some cleanup
            if len(pending) != 0:
                logger.warning("Killing pending tasks!")
                for task in pending:
                    task.cancel()

            # executor.shutdown(wait=False)

            # Get the return value
            if len(done) == 1:
                return_value = done.pop().result()
            else:
                return_value = None

            # Logg that we're done!
            logger.info(f"Done running job, returning {return_value}")

        return return_value

    def run_loop(*args, **kwargs):
        """
        Uses async and threading capabilities

        Use of background thread causes this error on shutdown:
            ERROR - asyncio - task: <Task pending coro=<HTTP1ServerConnection._server_request_loop() running at /gpfs/mskmindhdp_emc/sw/env/lib64/python3.6/site-packages/tornado/http1connection.py:817> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f52e8259318>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /gpfs/mskmindhdp_emc/sw/env/lib64/python3.6/site-packages/tornado/ioloop.py:690]>
            Seems like some async task gets hung up in the child thread...
        """

        loop = asyncio.new_event_loop()
        result = loop.run_until_complete(wrapped(*args, **kwargs))
        loop.close()
        return result

stats

compute_stats_1d(vec, fx_name_prefix, n_percentiles=4)

Computes 1d (histogram)-like summary statistics

Parameters:

Name Type Description Default
vec array

a 1-d vector input

required
fx_name_prefix str

Prefix for feature names

required
n_percentiles int

Number of percentiles to compute, default 4 = 0 (min), 25, 50, 75, 100 (max)

4

Returns:

Name Type Description
dict

summary statistics

Source code in src/luna/common/stats.py
def compute_stats_1d(vec, fx_name_prefix, n_percentiles=4):
    """Computes 1d (histogram)-like summary statistics

    Args:
        vec (np.array): a 1-d vector input
        fx_name_prefix (str): Prefix for feature names
        n_percentiles (int): Number of percentiles to compute, default 4 = 0 (min), 25, 50, 75, 100 (max)

    Returns:
        dict: summary statistics
    """
    n, _, sm, sv, ss, sk = scipy.stats.describe(vec)
    # ln_params = scipy.stats.lognorm.fit(vec, floc=0)

    hist_features = {
        f"{fx_name_prefix}_nobs": n,
        f"{fx_name_prefix}_mean": sm,
        f"{fx_name_prefix}_variance": sv,
        f"{fx_name_prefix}_skewness": ss,
        f"{fx_name_prefix}_kurtosis": sk,
        # f'{fx_name_prefix}_lognorm_fit_p0': ln_params[0],
        # f'{fx_name_prefix}_lognorm_fit_p2': ln_params[2]
    }

    percentiles = np.linspace(0, 100, n_percentiles + 1)

    for percentile, value in zip(percentiles, np.percentile(vec, percentiles)):
        hist_features[f"{fx_name_prefix}_pct{int(percentile)}"] = value

    return hist_features

utils

LunaCliCall

Source code in src/luna/common/utils.py
class LunaCliCall:
    def __init__(self, cli_call, cli_client):
        self.cli_call = cli_call
        self.cli_client = cli_client
        print(" ".join(f"{x}" for x in cli_call))

    def run(self, step_name):
        """Run (execute) CLI Call given a 'step_name', add step to parent CLI Client once completed.
        Args:
            step_name (str): Name of the CLI call, determines output directory, can act as inputs to other CLI steps
        """
        if "/" in step_name:
            raise RuntimeError("Cannot name steps with path-like character /")

        output_dir = self.cli_client.get_output_dir(step_name)
        self.cli_call.append("-o")
        self.cli_call.append(output_dir)

        print(self.cli_call)

        out, err = subprocess.Popen(
            self.cli_call, stdout=subprocess.PIPE, stderr=subprocess.PIPE
        ).communicate()

        print(f"{out.decode()}\n{err.decode()}")

        self.cli_client.cli_steps[step_name] = output_dir

run(step_name)

Run (execute) CLI Call given a 'step_name', add step to parent CLI Client once completed. Args: step_name (str): Name of the CLI call, determines output directory, can act as inputs to other CLI steps

Source code in src/luna/common/utils.py
def run(self, step_name):
    """Run (execute) CLI Call given a 'step_name', add step to parent CLI Client once completed.
    Args:
        step_name (str): Name of the CLI call, determines output directory, can act as inputs to other CLI steps
    """
    if "/" in step_name:
        raise RuntimeError("Cannot name steps with path-like character /")

    output_dir = self.cli_client.get_output_dir(step_name)
    self.cli_call.append("-o")
    self.cli_call.append(output_dir)

    print(self.cli_call)

    out, err = subprocess.Popen(
        self.cli_call, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    ).communicate()

    print(f"{out.decode()}\n{err.decode()}")

    self.cli_client.cli_steps[step_name] = output_dir

LunaCliClient

Source code in src/luna/common/utils.py
class LunaCliClient:
    def __init__(self, base_dir, uuid):
        """Initialize Luna CLI Client with a base directory (the root working directory) and a UUID to track results.

        Args:
            base_dir (str): parent working directory
            uuid (str): some unique string for this instance
        """
        self.base_dir = Path(base_dir).expanduser()
        self.uuid = uuid
        self.cli_steps = {}

    def bootstrap(self, step_name, data_path):
        """Add data  (boostrap a root CLI call).

        Args:
            step_name (str): Name of the (boostrap) CLI call, determines output directory, can act as inputs to other CLI steps
            data_path (str): Input data path
        """
        self.cli_steps[step_name] = Path(data_path).expanduser()

    def configure(self, cli_resource, *args, **kwargs):
        """Configure a CLI step.

        Args:
            cli_resource (str): CLI Resource string like
            args (list): List of CLI arguements
            kwargs (list): List of CLI parameters
        Returns:
            LunaCliCall
        """
        cli_call = cli_resource.split(" ")
        for arg in args:
            if arg in self.cli_steps.keys():
                cli_call.append(self.cli_steps[arg])
            else:
                cli_call.append(Path(arg).expanduser())

        for key, value in kwargs.items():
            cli_call.append(f"--{key}")
            if type(value) is not bool:
                cli_call.append(f"{value}")

        return LunaCliCall(cli_call, self)

    def get_output_dir(self, step_name):
        """Get output_dir based on base_dir, uuid, and step name.

        Args:
            step_name (str): parent working directory
        Returns:
            output_dir (str)
        """
        output_dir = os.path.join(self.base_dir, self.uuid, step_name)

        return output_dir

__init__(base_dir, uuid)

Initialize Luna CLI Client with a base directory (the root working directory) and a UUID to track results.

Parameters:

Name Type Description Default
base_dir str

parent working directory

required
uuid str

some unique string for this instance

required
Source code in src/luna/common/utils.py
def __init__(self, base_dir, uuid):
    """Initialize Luna CLI Client with a base directory (the root working directory) and a UUID to track results.

    Args:
        base_dir (str): parent working directory
        uuid (str): some unique string for this instance
    """
    self.base_dir = Path(base_dir).expanduser()
    self.uuid = uuid
    self.cli_steps = {}

bootstrap(step_name, data_path)

Add data (boostrap a root CLI call).

Parameters:

Name Type Description Default
step_name str

Name of the (boostrap) CLI call, determines output directory, can act as inputs to other CLI steps

required
data_path str

Input data path

required
Source code in src/luna/common/utils.py
def bootstrap(self, step_name, data_path):
    """Add data  (boostrap a root CLI call).

    Args:
        step_name (str): Name of the (boostrap) CLI call, determines output directory, can act as inputs to other CLI steps
        data_path (str): Input data path
    """
    self.cli_steps[step_name] = Path(data_path).expanduser()

configure(cli_resource, *args, **kwargs)

Configure a CLI step.

Parameters:

Name Type Description Default
cli_resource str

CLI Resource string like

required
args list

List of CLI arguements

()
kwargs list

List of CLI parameters

{}

Returns: LunaCliCall

Source code in src/luna/common/utils.py
def configure(self, cli_resource, *args, **kwargs):
    """Configure a CLI step.

    Args:
        cli_resource (str): CLI Resource string like
        args (list): List of CLI arguements
        kwargs (list): List of CLI parameters
    Returns:
        LunaCliCall
    """
    cli_call = cli_resource.split(" ")
    for arg in args:
        if arg in self.cli_steps.keys():
            cli_call.append(self.cli_steps[arg])
        else:
            cli_call.append(Path(arg).expanduser())

    for key, value in kwargs.items():
        cli_call.append(f"--{key}")
        if type(value) is not bool:
            cli_call.append(f"{value}")

    return LunaCliCall(cli_call, self)

get_output_dir(step_name)

Get output_dir based on base_dir, uuid, and step name.

Parameters:

Name Type Description Default
step_name str

parent working directory

required

Returns: output_dir (str)

Source code in src/luna/common/utils.py
def get_output_dir(self, step_name):
    """Get output_dir based on base_dir, uuid, and step name.

    Args:
        step_name (str): parent working directory
    Returns:
        output_dir (str)
    """
    output_dir = os.path.join(self.base_dir, self.uuid, step_name)

    return output_dir

apply_csv_filter(input_paths, subset_csv=None, storage_options={})

Filters a list of input_paths based on include/exclude logic given for either the full path, filename, or filestem.

If using "include" logic, only matching entries with include=True are kept. If using "exclude" logic, only matching entries with exclude=True are removed.

The origional list is returned if the given subset_csv is None or empty.

Parameters:

Name Type Description Default
input_paths list[str]

list of input paths to filter

required
subset_csv str

path to a csv with subset/filter information/flags

None

Returns list[str]: filtered list Raises: RuntimeError: If the given subset_csv is invalid

Source code in src/luna/common/utils.py
def apply_csv_filter(input_paths, subset_csv=None, storage_options={}):
    """Filters a list of input_paths based on include/exclude logic given for either the full path, filename, or filestem.

    If using "include" logic, only matching entries with include=True are kept.
    If using "exclude" logic, only matching entries with exclude=True are removed.

    The origional list is returned if the given subset_csv is None or empty.

    Args:
        input_paths (list[str]): list of input paths to filter
        subset_csv (str): path to a csv with subset/filter information/flags
    Returns
        list[str]: filtered list
    Raises:
        RuntimeError: If the given subset_csv is invalid
    """

    if not len(subset_csv) > 0 or subset_csv is None:
        return input_paths
    if not os.path.exists(subset_csv):
        return input_paths

    try:
        subset_df = pd.read_csv(
            subset_csv, dtype={0: str}, storage_options=storage_options
        )

        match_type = subset_df.columns[0]
        filter_logic = subset_df.columns[1]

        if match_type not in ["path", "filename", "stem"]:
            raise RuntimeError("Invalid match type column")
        if filter_logic not in ["include", "exclude"]:
            raise RuntimeError("Invalid match type column")
    except Exception as exc:
        logger.error(exc)
        raise RuntimeError(
            "Invalid subset .csv passed, must be a 2-column csv with headers = [ (path|filename|stem), (include|exclude) ]"
        )

    if not len(subset_df) > 0:
        return input_paths

    logger.info(
        f"Applying csv filter, match_type={match_type}, filter_logic={filter_logic}"
    )

    input_path_df = pd.DataFrame(
        {"path": path, "filename": Path(path).name, "stem": Path(path).stem}
        for path in input_paths
    ).astype(str)

    df_matches = input_path_df.set_index(match_type).join(
        subset_df.set_index(match_type)
    )

    if filter_logic == "include":
        out = df_matches.loc[(df_matches["include"] == 1)]
    if filter_logic == "exclude":
        out = df_matches.loc[df_matches["exclude"] == 1]

    return list(out.reset_index()["path"])

generate_uuid(urlpath, prefix, storage_options={})

Returns hash of the file given path, preceded by the prefix. :param path: file path e.g. file:/path/to/file :param prefix: list e.g. ["SVGEOJSON","default-label"] :return: string uuid

Source code in src/luna/common/utils.py
def generate_uuid(urlpath: str, prefix, storage_options={}):
    """
    Returns hash of the file given path, preceded by the prefix.
    :param path: file path e.g. file:/path/to/file
    :param prefix: list e.g. ["SVGEOJSON","default-label"]
    :return: string uuid
    """

    fs, path = fsspec.core.url_to_fs(urlpath, **storage_options)

    rec_hash = str(fs.checksum(path))
    prefix.append(rec_hash)
    return "-".join(prefix)

generate_uuid_binary(content, prefix)

Returns hash of the binary, preceded by the prefix. :param content: binary :param prefix: list e.g. ["FEATURE"] :return: string uuid

Source code in src/luna/common/utils.py
def generate_uuid_binary(content, prefix):
    """
    Returns hash of the binary, preceded by the prefix.
    :param content: binary
    :param prefix: list e.g. ["FEATURE"]
    :return: string uuid
    """
    warnings.warn(
        "generate_uuid_binary() should not be used anymore, the UUIDs generated are not valid!"
    )

    content = BytesIO(content)

    uuid = "00000000"
    prefix.append(uuid)
    return "-".join(prefix)

generate_uuid_dict(json_str, prefix)

Returns hash of the json string, preceded by the prefix. :param json_str: str representation of json :param prefix: list e.g. ["SVGEOJSON","default-label"] :return: v

Source code in src/luna/common/utils.py
def generate_uuid_dict(json_str, prefix):
    """
    Returns hash of the json string, preceded by the prefix.
    :param json_str: str representation of json
    :param prefix: list e.g. ["SVGEOJSON","default-label"]
    :return: v
    """
    # json_bytes = json.dumps(json_str).encode("utf-8")
    warnings.warn(
        "generate_uuid_dict() should not be used anymore, the UUIDs generated are not valid!"
    )

    uuid = "00000000"
    prefix.append(uuid)
    return "-".join(prefix)

get_absolute_path(module_path, relative_path)

Given the path to a module file and the path, relative to the module file, of another file that needs to be referenced in the module, this method returns the absolute path of the file that needs to be referenced.

This method makes it possible to resolve absolute paths to files in any environment a module and the referenced files are deployed to.

:param module_path path to the module. Use 'file' from the module. :param relative_path path to the file that needs to be referenced by the module. The path must be relative to the module. :return absolute path to file with the specified relative_path

Source code in src/luna/common/utils.py
def get_absolute_path(module_path, relative_path):
    """Given the path to a module file and the path, relative to the module file, of another file
    that needs to be referenced in the module, this method returns the absolute path of the file
    that needs to be referenced.

    This method makes it possible to resolve absolute paths to files in any environment a
    module and the referenced files are deployed to.

    :param module_path path to the module. Use '__file__' from the module.
    :param relative_path path to the file that needs to be referenced by the module. The path must
    be relative to the module.
    :return absolute path to file with the specified relative_path
    """
    path = os.path.join(os.path.dirname(module_path), relative_path)

    # resolve any back-paths with ../ to simplify absolute path
    return os.path.realpath(path)

get_config(cli_kwargs)

Get the config with merged OmegaConf files

Parameters:

Name Type Description Default
cli_kwargs dict

CLI keyword arguments

required
Source code in src/luna/common/utils.py
def get_config(cli_kwargs: dict):
    """Get the config with merged OmegaConf files

    Args:
        cli_kwargs (dict): CLI keyword arguments
    """
    configs = []  # type: List[Union[ListConfig, DictConfig]]

    cli_conf = OmegaConf.create(cli_kwargs)
    configs.append(cli_conf)

    # Get params from param file
    if cli_conf.get("local_config"):
        with open(cli_conf.local_config, "r", **cli_conf.storage_options) as f:
            local_conf = OmegaConf.load(f)
            configs.insert(0, local_conf)

    try:
        merged_conf = OmegaConf.to_container(
            OmegaConf.merge(*configs), resolve=True, throw_on_missing=True
        )
    except MissingMandatoryValue as e:
        raise fire.core.FireError(e)

    if merged_conf.get("output_urlpath"):
        o = urlparse(str(merged_conf.get("output_urlpath")))
        merged_conf["output_filesystem"] = "file"
        if o.scheme != "":
            merged_conf["output_filesystem"] = o.scheme
        if merged_conf["output_filesystem"] != "file" and not merged_conf.get(
            "output_storage_options"
        ):
            merged_conf["output_storage_options"] = merged_conf.get(
                "storage_options", {}
            )
        if merged_conf["output_filesystem"] == "file" and not merged_conf.get(
            "output_storage_options"
        ):
            merged_conf["output_storage_options"] = {"auto_mkdir": True}

    return merged_conf

get_dataset_url()

Retrieve a "dataset URL" from the environment, may look like http://localhost:6077 or file:///absolute/path/to/dataset/dir

Source code in src/luna/common/utils.py
def get_dataset_url():
    """Retrieve a "dataset URL" from the environment, may look like http://localhost:6077 or file:///absolute/path/to/dataset/dir"""
    dataset_url = os.environ.get("DATASET_URL", None)

    if dataset_url is None:
        logger.warning(
            "Requesting feature data be sent to dataset, however no dataset URL provided, please set env DATASET_URL!"
        )
    else:
        logger.info(f"Found dataset URL = {dataset_url}")

    return dataset_url

grouper(iterable, n)

Turn an iterable into an iterable of iterables

'None' should not be a member of the input iterable as it is removed to handle the fillvalues

Parameters:

Name Type Description Default
iterable iterable

an iterable

required
n int

sie of chunks

required

Returns:

Type Description

iterable[iterable]

Source code in src/luna/common/utils.py
def grouper(iterable, n):
    """Turn an iterable into an iterable of iterables

    'None' should not be a member of the input iterable as it is removed to handle the fillvalues

    Args:
        iterable (iterable): an iterable
        n (int): sie of chunks
        fillvalue

    Returns:
        iterable[iterable]
    """
    args = [iter(iterable)] * n
    return [
        [entry for entry in iterable if entry is not None]
        for iterable in itertools.zip_longest(*args, fillvalue=None)
    ]

load_func(dotpath)

Load function in module from a parsed yaml string.

Parameters:

Name Type Description Default
dotpath str

module/function name written as a string (ie torchvision.models.resnet34)

required

Returns: The inferred module itself, not the string representation

Source code in src/luna/common/utils.py
def load_func(dotpath: str):
    """Load function in module from a parsed yaml string.

    Args:
        dotpath (str): module/function name written as a string (ie torchvision.models.resnet34)
    Returns:
        The inferred module itself, not the string representation
    """
    module_, func = dotpath.rsplit(".", maxsplit=1)
    m = import_module(module_)
    return getattr(m, func)

local_cache_urlpath(file_key_write_mode={}, dir_key_write_mode={})

Decorator for caching url/paths locally

Source code in src/luna/common/utils.py
def local_cache_urlpath(
    file_key_write_mode: dict[str, str] = {},
    dir_key_write_mode: dict[str, str] = {},
):
    """Decorator for caching url/paths locally"""

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            args_dict = _get_args_dict(func, args, kwargs)
            new_args_dict = args_dict.copy()

            tmp_dir_dest = []
            for key, write_mode in dir_key_write_mode.items():
                if key not in args_dict or not args_dict[key]:
                    continue
                storage_options_key = "storage_options"
                if "w" in write_mode:
                    storage_options_key = "output_storage_options"
                fs, dir = fsspec.core.url_to_fs(
                    args_dict[key], **args_dict.get(storage_options_key, {})
                )
                if fs.protocol != "file" and "cache" not in fs.protocol:
                    new_args_dict[storage_options_key] = {"auto_mkdir": True}
                    tmp_dir = tempfile.TemporaryDirectory()
                    new_args_dict[key] = tmp_dir.name
                    tmp_dir_dest.append((tmp_dir, dir, fs))

            result = None
            with ExitStack() as stack:
                for key, write_mode in file_key_write_mode.items():
                    if key not in args_dict or not args_dict[key]:
                        continue
                    storage_options_key = "storage_options"
                    if "w" in write_mode:
                        storage_options_key = "output_storage_options"
                    fs, path = fsspec.core.url_to_fs(
                        args_dict[key], **args_dict.get(storage_options_key, {})
                    )
                    if "cache" not in fs.protocol:
                        simplecache_fs = fsspec.filesystem("simplecache", fs=fs)

                        of = simplecache_fs.open(path, write_mode)
                        stack.enter_context(of)
                        new_args_dict[key] = of.name

                result = func(**new_args_dict)

            for tmp_dir, dest, fs in tmp_dir_dest:
                copy_files(tmp_dir.name, dest, destination_filesystem=fs)

            return result

        return wrapper

    return decorator

post_to_dataset(input_feature_data, waystation_url, dataset_id, keys)

Interface feature data to a parquet dataset.

Parameters:

Name Type Description Default
input_feature_data str

path to input data

required
waystation_url str

URL of dataset root (either file or using waystation)

required
dataset_id str

Dataset name/ID

required
keys dict

corresponding segment keys

required
Source code in src/luna/common/utils.py
def post_to_dataset(input_feature_data, waystation_url, dataset_id, keys):
    """Interface feature data to a parquet dataset.

    Args:
        input_feature_data (str): path to input data
        waystation_url (str): URL of dataset root (either file or using waystation)
        dataset_id (str): Dataset name/ID
        keys (dict): corresponding segment keys
    """

    logger.info(f"Adding {input_feature_data} to {dataset_id} via {waystation_url}")

    segment_id = "-".join([v for _, v in sorted(keys.items())])

    logger.info(f"SEGMENT_ID={segment_id}")

    post_url = os.path.join(
        waystation_url, "datasets", dataset_id, "segments", segment_id
    )

    parsed_url = urllib.parse.urlparse(post_url)

    if "http" in parsed_url.scheme:
        # The cool way, using luna waystation

        logger.info(f"Posting to: {post_url}")

        res = requests.post(
            post_url,
            files={"segment_data": open(input_feature_data, "rb")},
            data={"segment_keys": json.dumps(keys)},
        )

        logger.info(f"{res}: {res.text}")

    elif "file" in parsed_url.scheme:
        # The less cool way, just using file paths

        segment_dir = Path(parsed_url.path)

        logger.info(f"Writing to: {segment_dir}")

        os.makedirs(segment_dir, exist_ok=True)

        data = pd.read_parquet(input_feature_data).reset_index()
        data = data.drop(columns="index", errors="ignore")
        data["SEGMENT_ID"] = segment_id
        re_indexors = ["SEGMENT_ID"]

        if keys is not None:
            for key, value in keys.items():
                data.loc[:, key] = value
                re_indexors.append(key)

        data = data.set_index(
            re_indexors
        ).reset_index()  # a trick to move columns to the left

        data.to_parquet(segment_dir.joinpath("data.parquet"))

    else:
        logger.warning("Unrecognized scheme: {parsed_url.scheme}, skipping!")

save_metadata(func)

This decorator saves metadata in output_url

Source code in src/luna/common/utils.py
def save_metadata(func):
    """This decorator saves metadata in output_url"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        metadata = get_config(_get_args_dict(func, args, kwargs))
        result = func(*args, **kwargs)
        if result is not None:
            metadata = metadata | result
            if "output_urlpath" in metadata:
                o = urlparse(str(metadata["output_urlpath"]))
                fs = fsspec.filesystem(
                    o.scheme, **metadata.get("output_storage_options", {})
                )
                with fs.open(Path(o.netloc + o.path) / "metadata.yml", "w") as f:
                    yaml.dump(metadata, f)
        return

    return wrapper

timed(func)

This decorator prints the execution time for the decorated function.

Source code in src/luna/common/utils.py
def timed(func):
    """This decorator prints the execution time for the decorated function."""

    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        logger.debug("{} ran in {}s".format(func.__name__, round(end - start, 2)))
        return result

    return wrapper

validate_dask_address(addr)

Return True if addr appears to be a valid address for a dask scheduler.

The typical format for this will be something like 'tcp://192.168.0.37:8786', but there could be a hostname instead of an IP address, and maybe some other URL schemes are supported. This function will be used to check whether a user-defined dask scheduler address is plausible, or obviously invalid.

Source code in src/luna/common/utils.py
def validate_dask_address(addr: str) -> bool:
    """
    Return True if `addr` appears to be a valid address for a dask scheduler.

    The typical format for this will be something like 'tcp://192.168.0.37:8786',
    but there could be a hostname instead of an IP address, and maybe some other
    URL schemes are supported.  This function will be used to check whether a
    user-defined dask scheduler address is plausible, or obviously invalid.
    """
    HOSTPORT_RE = re.compile(
        r"""^(?P<scheme> tcp):
        // (?P<host>\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3} |
           [A-Za-z][A-Za-z0-9.-]*[A-Za-z0-9] |
           [A-Za-z])
         : (?P<port>\d+)$""",
        re.VERBOSE,
    )
    return bool(HOSTPORT_RE.match(addr))