from numbers import Number
import random
import math
import six
import numpy as np
from . import distributions as dist
from . import generate
[docs]class ClusterGenerator(object):
"""
Structure to handle the input and create clusters according to it.
"""
[docs] def __init__(self, seed=1, n_samples=2000, n_feats=2, k=5, min_samples=0, possible_distributions=None,
distributions=None, mv=True, corr=0., compactness_factor=0.1, alpha_n=1,
scale=True, outliers=50, rotate=True, add_noise=0, n_noise=None, ki_coeff=3., **kwargs):
"""
Args:
seed (int): Seed for the generation of random values. Useful for consistency.
n_samples (int): Number of samples to generate.
n_feats (int): Number of dimensions/features for each sample.
k (int or list of int): Number of clusters to generate. If input is a list, each element in it specifies the
number of samples in each cluster. In that case, the number of clusters will be the length of the list.
min_samples (int): Minimum number of samples in each cluster. If 0, the default minimum for a cluster with
:math:`N` samples is :math:`N/(\\text{ki_coeff}*k)`.
possible_distributions (list): List of distributions to randomly choose from. Each element in this list
must either be a valid str (valid str are defined in :data:`~.distributions.valid_distributions`
OR a function which implements the distribution OR an instance of
:class:`~.distributions.Distribution`.
This parameter is overridden by ``distributions``, when set.
distributions (str or function or .distributions.Distribution or list): Distribution to be used.
If list, its length must be ``k``, and each element in the list must either be a valid str (indicating
the distribution to be used) OR a function which implements the distribution OR a list of str/functions
with length ``n_feats``.
Instances of :class:`~.distributions.Distribution` can also be used.
Valid str are defined in :data:`~.distributions.valid_distributions`.
mv (bool or list of bool or None): Multivariate distributions or distributions defining intra-distances. If
True, distributions define feature values (multivariate). If False, distributions define
intra-distances.
If None, this choice is made at random.
If a list, its length must be ``k``, and each value in the list applies to one cluster.
corr (float or list of float): Maximum (in absolute value) correlation between variables.
If a list, its length must be ``k``, and each value in the list applies to one cluster.
compactness_factor (float or list of float): Compactness factor.
If a list, its length must be ``k``, and each value in the list applies to one cluster.
alpha_n (float or list of float): Determines grid hyperplanes. If :math:`\\alpha_n > 0`, the number of
hyperplanes is a factor of
:math:`\\alpha_n * \\left \\lfloor{1 + \\frac{k}{\\log(k)}}\\right \\rfloor`.
If :math:`\\alpha_n < 0`, the number of hyperplanes is :math:`|\\alpha_n|`.
If a list, its length must be ``n_feats``, and each value in the list applies to one dimension.
scale (bool or list of bool): Optimizes cluster separation based on grid size. If True, scale based on min
distance between grid hyperplanes. If False, scale based on max distance between grid hyperplanes.
If None, does not scale.
If a list, its length must be ``k``, and each value in the list applies to one cluster.
outliers (int): Number of outliers.
rotate (bool or list of bool): If True, clusters can rotate.
If a list, its length must be ``k``, and each value in the list applies to one cluster.
add_noise (int): Add this number of noisy dimensions.
n_noise (list): Parameter that manages noisy dimensions.
If a list of int (of size :math:`\\leq` ``n_feats``, and each element is :math:`\\geq 0` and
:math:`<` ``n_feats``), each dimension listed (0-indexed) will have only noise.
If a list of list of int (of length ``k``, and each element is a list of length :math:`\\leq`
``n_feats``, with values :math:`\\geq 0` and :math:`<` ``n_feats``), each list indicates the noisy
dimensions for a particular cluster.
ki_coeff (float): Coefficient used to define the default minimum number of samples per cluster.
"""
self.seed = seed
self.n_samples = n_samples
self.n_feats = n_feats
self.k = k
self.n_clusters = len(k) if type(k) == list else k
self.min_samples = min_samples
self.possible_distributions = possible_distributions if possible_distributions is not None \
else ['gaussian', 'uniform']
self.distributions = distributions
self.mv = mv
self.corr = corr
self.compactness_factor = compactness_factor
self.alpha_n = alpha_n
self._cmax = None
self.scale = scale
self.outliers = outliers
self.rotate = rotate
self.add_noise = add_noise
self.n_noise = n_noise if n_noise is not None else []
self.ki_coeff = ki_coeff
random.seed(self.seed)
for key, val in kwargs.items():
self.__dict__[key] = val
self._distributions = None
self._validate_parameters()
self.clusters = self.get_cluster_configs()
self._mass = None
self._centroids = None
self._locis = None
self._idx = None
[docs] def generate_data(self, batch_size=0):
np.random.seed(self.seed)
self._mass = generate.generate_mass(self)
self._centroids, self._locis, self._idx = generate.locate_centroids(self)
batches = generate.generate_clusters(self, batch_size)
if batch_size == 0: # if batch_size == 0, just return the data instead of the generator
return next(batches)
else:
return batches
[docs] def get_cluster_configs(self):
return [Cluster(self, i) for i in range(self.n_clusters)]
def _validate_parameters(self):
"""
Method to validate the parameters of the object.
"""
if hasattr(self.k, '__iter__'):
if len(self.k) == 1: # if only one input, no point in being a list
self.k = self.k[0]
self.n_clusters = self.k
elif len(self.k) < 1:
raise ValueError('"k" parameter must have at least one value!')
else:
if sum(self.k) != self.n_samples:
raise ValueError('Total number of points must be the same as the sum of points in each cluster!')
if self.distributions is not None:
# check validity of self.distributions, and turning it into a (n_clusters, n_feats) matrix
if hasattr(self.distributions, '__iter__') and not type(self.distributions) == str:
if len(self.distributions) != self.n_clusters:
raise ValueError('There must be exactly one distribution input for each cluster!')
if hasattr(self.distributions[0], '__iter__'):
if not all(hasattr(elem, '__iter__') and len(elem) == self.n_feats for elem in self.distributions):
raise ValueError('Invalid distributions input! Input must have dimensions (n_clusters, n_feats).')
else:
self.distributions = [self.distributions] * self.n_clusters
self._distributions = dist.check_input(self.distributions)
else:
self.distributions = [random.choice(self.possible_distributions) for _ in range(self.n_clusters)]
self._distributions = dist.check_input(self.distributions)
# check validity of self.mv, and turn it into a list with self.n_clusters elements
if hasattr(self.mv, '__iter__'):
if len(self.mv) != self.n_clusters:
raise ValueError('There must be exactly one "mv" parameter for each cluster!')
else:
if self.mv is None:
self.mv = [random.choice([True, False]) for _ in range(self.n_clusters)]
else:
self.mv = [self.mv] * self.n_clusters
assert all(_validate_mv(elem) for elem in self.mv)
# check validity of self.scale, and turn it into a list with self.n_clusters elements
if hasattr(self.scale, '__iter__'):
if len(self.scale) != self.n_clusters:
raise ValueError('There must be exactly one "scale" parameter for each cluster!')
else:
self.scale = [self.scale] * self.n_clusters
assert all(_validate_scale(elem) for elem in self.scale)
# check validity of self.corr, and turn it into a list with self.n_clusters elements
if hasattr(self.corr, '__iter__'):
if len(self.corr) != self.n_clusters:
raise ValueError('There must be exactly one correlation "corr" value for each cluster!')
else:
self.corr = [self.corr] * self.n_clusters
assert all(_validate_corr(elem) for elem in self.corr)
# check validity of self.alpha_n, and turn it into a list with self.n_feats elements
if hasattr(self.alpha_n, '__iter__'):
if len(self.alpha_n) != self.n_feats:
raise ValueError('There must be exactly one hyperplane parameter "alpha_n" value for each dimension!')
else:
self.alpha_n = [self.alpha_n] * self.n_feats
assert all(_validate_alpha_n(elem) for elem in self.alpha_n)
# set self._cmax
self._cmax = [math.floor(1 + self.n_clusters / math.log(self.n_clusters))] * self.n_feats \
if self.n_clusters > 1 else [1 + 2 * (self.outliers > 1)] * self.n_feats
self._cmax = [round(-a) if a < 0 else round(c * a) for a, c in zip(self.alpha_n, self._cmax)]
self._cmax = np.array(self._cmax)
# check validity of self.compactness_factor, and turn it into a list with self.n_clusters elements
if hasattr(self.compactness_factor, '__iter__'):
if len(self.compactness_factor) != self.n_clusters:
raise ValueError('There must be exactly one compactness "compactness_factor" value for each cluster!')
else:
self.compactness_factor = [self.compactness_factor] * self.n_clusters
assert all(_validate_compactness_factor(elem) for elem in self.compactness_factor)
cmax_max = max(self._cmax)
cmax_min = min(self._cmax)
self.compactness_factor = [cp / cmax_max if s else (cp / cmax_min if not s else cp)
for cp, s in zip(self.compactness_factor, self.scale)]
# check validity of self.rotate, and turn it into a list with self.n_clusters elements
if hasattr(self.rotate, '__iter__'):
if len(self.rotate) != self.n_clusters:
raise ValueError('There must be exactly one rotate value for each cluster!')
else:
self.rotate = [self.rotate] * self.n_clusters
assert all(_validate_rotate(elem) for elem in self.rotate)
# check validity of self.add_noise and self.n_noise
if not isinstance(self.add_noise, six.integer_types):
raise ValueError('Invalid input for "add_noise"! Input must be integer.')
if hasattr(self.n_noise, '__iter__'):
if len(self.n_noise) == 0:
self.n_noise = [[]] * self.n_clusters
if hasattr(self.n_noise[0], '__iter__'):
if len(self.n_noise) != self.n_clusters:
raise ValueError('Invalid input for "n_noise"! List length must be the number of clusters.')
else:
self.n_noise = [self.n_noise] * self.n_clusters
else:
raise ValueError('Invalid input for "n_noise"! Input must be a list.')
assert all(_validate_n_noise(elem, self.n_feats) for elem in self.n_noise)
@property
def mass(self):
return self._mass
[docs]class Cluster(object):
"""
Contains the parameters of an individual cluster.
"""
settables = ['distributions', 'mv', 'corr', 'compactness_factor', 'scale', 'rotate', 'n_noise']
"""
List of settable properties of Cluster. These are the parameters which can be set at a cluster level, and override
the parameters of the cluster generator.
"""
[docs] def __init__(self, cfg, idx, corr_matrix=None):
"""
Args:
cfg (ClusterGenerator): Configuration of the data.
idx (int): Index of a cluster.
corr_matrix (np.array): Valid correlation matrix to use in this cluster.
"""
self.cfg = cfg
self.idx = idx
self.corr_matrix = corr_matrix
[docs] def generate_data(self, samples):
if hasattr(self.distributions, '__iter__'):
out = np.zeros((samples, self.cfg.n_feats))
for f in range(self.cfg.n_feats):
out[:,f] = self.distributions[f](samples, self.mv, self.compactness_factor)
return out
else:
return self.distributions((samples, self.cfg.n_feats), self.mv, self.compactness_factor)
@property
def n_feats(self):
return self.cfg.n_feats
@property
def distributions(self):
return self.cfg._distributions[self.idx]
@distributions.setter
def distributions(self, value):
if isinstance(value, six.string_types):
self.cfg._distributions[self.idx] = dist.get_dist_function(value)
elif hasattr(value, '__iter__'):
self.cfg._distributions[self.idx] = [dist.get_dist_function(d) for d in value]
else:
self.cfg._distributions[self.idx] = dist.get_dist_function(value)
@property
def mv(self):
return self.cfg.mv[self.idx]
@mv.setter
def mv(self, value):
assert _validate_mv(value)
self.cfg.mv[self.idx] = value
@property
def corr(self):
return self.cfg.corr[self.idx]
@corr.setter
def corr(self, value):
assert _validate_corr(value)
self.cfg.corr[self.idx] = value
@property
def compactness_factor(self):
return self.cfg.compactness_factor[self.idx]
@compactness_factor.setter
def compactness_factor(self, value):
assert _validate_compactness_factor(value)
self.cfg.compactness_factor[self.idx] = value
@property
def scale(self):
return self.cfg.scale[self.idx]
@scale.setter
def scale(self, value):
assert _validate_scale(value)
self.cfg.scale[self.idx] = value
@property
def rotate(self):
return self.cfg.rotate[self.idx]
@rotate.setter
def rotate(self, value):
assert _validate_rotate(value)
self.cfg.rotate[self.idx] = value
@property
def n_noise(self):
return self.cfg.n_noise[self.idx]
@n_noise.setter
def n_noise(self, value):
assert _validate_n_noise(value, self.cfg.n_feats)
self.cfg.n_noise[self.idx] = value
[docs]class ScheduledClusterGenerator(ClusterGenerator):
"""
This cluster generator takes a schedule and all the ClusterGenerator arguments, and activates only the specified
clusters in the schedule, for each time step.
A time step is defined as one get call to ``self.mass``, which is done when generating each new batch.
That is, one time step is one call to :func:`.generate.compute_batch`.
"""
[docs] def __init__(self, schedule, *args, **kwargs):
"""
Args:
schedule (list): List in which each element contains the indexes of the clusters active in the respective
time step.
*args: args for :meth:`ClusterGenerator.__init__`.
**kwargs: kwargs for :meth:`ClusterGenerator.__init__`.
"""
super(ScheduledClusterGenerator, self).__init__(*args, **kwargs)
self.cur_time = 0
self.schedule = schedule
@property
def mass(self):
mass = self._mass.copy()
cur_clusters = self.schedule[self.cur_time % len(self.schedule)]
for c in range(len(mass)): # set the mass of clusters not scheduled now to 0
if c not in cur_clusters:
mass[c] = 0
self.cur_time += 1 # increase time
return mass
def _validate_mv(mv):
"""
Checks validity of input for `mv`.
Args:
mv (bool): Input to check validity
Returns:
bool: True if valid. Raises exception if not.
"""
if mv not in [True, None, False]:
raise ValueError('Invalid input value for "mv"!')
return True
def _validate_corr(corr):
"""
Checks validity of input for `corr`.
Args:
corr (float): Input to check validity.
Returns:
bool: True if valid. Raises exception if not.
"""
if not isinstance(corr, Number):
raise ValueError('Invalid input value for "corr"! Values must be numeric')
if not 0 <= corr <= 1:
raise ValueError('Invalid input value for "corr"! Values must be between 0 and 1.')
return True
def _validate_compactness_factor(compactness_factor):
"""
Checks validity of input for `compactness_factor`.
Args:
compactness_factor (float): Input to check validity.
Returns:
bool: True if valid. Raises exception if not.
"""
if not isinstance(compactness_factor, Number):
raise ValueError('Invalid input value for "compactness_factor"! Values must be numeric')
# TODO 0 <= compactness_factor <= 1 ?
return True
def _validate_alpha_n(alpha_n):
"""
Checks validity of input for `alpha_n`.
Args:
alpha_n (float): Input to check validity.
Returns:
bool: True if valid. Raises exception if not.
"""
if not isinstance(alpha_n, Number):
raise ValueError('Invalid input for "alpha_n"! Values must be numeric.')
if alpha_n == 0:
raise ValueError('Invalid input for "alpha_n"! Values must be different from 0.')
return True
def _validate_scale(scale):
"""
Checks validity of input for `scale`.
Args:
scale (bool): Input to check validity.
Returns:
bool: True if valid. Raises exception if not.
"""
if scale not in [True, None, False]:
raise ValueError('Invalid input value for "scale"! Input must be boolean (or None).')
return True
def _validate_rotate(rotate):
"""
Checks validity of input for `rotate`.
Args:
rotate (bool): Input to check validity.
Returns:
bool: True if valid. Raises exception if not.
"""
if rotate not in [True, False]:
raise ValueError('Invalid input for "rotate"! Input must be boolean.')
return True
def _validate_n_noise(n_noise, n_feats):
"""
Checks validity of input for `n_noise`.
Args:
n_noise (list of int): Input to check validity.
n_feats (int): Number of dimensions/features.
Returns:
"""
if not hasattr(n_noise, '__iter__'):
raise ValueError('Invalid input for "n_noise"! Input must be a list.')
if len(n_noise) > n_feats:
raise ValueError('Invalid input for "n_noise"! Input has more dimensions than total number of dimensions.')
if not all(isinstance(n, six.integer_types) for n in n_noise):
raise ValueError('Invalid input for "n_noise"! Input dimensions must be integers.')
if not all(0 <= n < n_feats for n in n_noise):
raise ValueError('Invalid input for "n_noise"! Input dimensions must be in the interval [0, "n_feats"[.')
return True