#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# This file is part of the pattern-clustering project.
# https://github.com/nokia/pattern-clustering
"""Boost C++ functions wrapper."""
__author__ = "Marc-Olivier Buob, Maxime Raynal"
__maintainer__ = "Marc-Olivier Buob, Maxime Raynal"
__email__ = "marc-olivier.buob@nokia-bell-labs.com, maxime.raynal@nokia.com"
__copyright__ = "Copyright (C) 2022, Nokia"
__license__ = "Nokia"
import multiprocessing, string, sys
try:
# Import from C++
# Naming convention: symbols from pc_boost are prefixed by _ to prevent
# them to clash with those from the python module.
from pattern_clustering.pattern_clustering import PatternAutomaton as _PatternAutomaton
from pattern_clustering.pattern_clustering import pattern_distance as _pattern_distance
from pattern_clustering.pattern_clustering import pattern_clustering as _pattern_clustering
from pattern_clustering.pattern_clustering import pattern_distance_normalized
except ImportError:
print("pattern_clustering is not yet installed and so the C++ objects cannot be imported!", file=sys.stderr)
import sys
print(sys.path)
sys.exit()
from .language_density import language_density
from .pattern_automaton import *
from .regexp import make_dfa_any, make_map_name_dfa
[docs]class PatternClusteringEnv:
"""
Stores the pattern clustering settings.
"""
def __init__(self, names: list = None, alphabet: set = None):
"""
Creates the parameters required to run pattern distance
and pattern clustering computations.
Args:
names (list[str]): List storing the pattern names taken into account
(see default patterns in `pattern_clustering.pattern`).
alphabet (set): The set of characters supported by the crafted
`PatternAutomaton` instances.
"""
if not alphabet:
alphabet = set(string.printable)
self.alphabet = alphabet
# `names` refers to types matched by MultiGrep to build pattern automata.
# Thus, you should not include "any" to improve performance.
if not names:
names = ["float", "hexa", "int", "ipv4", "spaces", "uint", "word"]
self.map_name_dfa = make_map_name_dfa(names)
# Note that the density of "any" is required in the latter.
self.map_name_density = {
name: language_density(dfa, alphabet)
for (name, dfa) in self.map_name_dfa.items()
}
if "any" not in self.map_name_density.keys():
self.map_name_density["any"] = language_density(make_dfa_any(alphabet), alphabet)
@property
def densities(self) -> list:
"""
Returns:
The densities assigned to each pattern defined in `self.map_name_dfa`.
"""
return make_densities(self.map_name_density)
[docs]def make_pattern_automaton(w: str, map_name_dfa: dict, make_mg=None):
"""
Builds a `PatternAutomaton` C++ instance from a input string.
Args:
w (str): The string to convert.
map_name_dfa (dict): Maps each pattern name (`str`) with its corresponding `Automaton`.
make_mg (MultiGrepFunctor): The strategy used to build the PatternAutomaton,
defaults to `None`.
Returns:
The `PatternAutomaton` C++ instance.
"""
# Transform python PatternAutomaton to a C++ PatternAutomaton
g = PatternAutomaton(w, map_name_dfa, make_mg)
if "any" not in map_name_dfa.keys():
# If some string is not caught by any pattern, it results to an "any" arc in
# in the PatternAutomaton, and the "any" pattern might not be declared in
# map_name_dfa.
map_name_dfa["any"] = make_dfa_any()
map_name_id = {k: i for (i, k) in enumerate(sorted(map_name_dfa.keys()))}
n = len(w) + 1
_g = _PatternAutomaton(n, len(map_name_dfa), w)
for e in edges(g):
q = source(e, g)
r = target(e, g)
a = map_name_id[label(e, g)]
_g.add_edge(q, r, a)
return _g
# Default parameters
PATTERN_CLUSTERING_ENV = PatternClusteringEnv(names=None, alphabet=None)
[docs]def make_densities(map_name_density: dict = None) -> list:
"""
Builds the language density vector.
Args:
map_name_density (dict): Maps mapping each pattern name (`str`)
with the corresponding density (`float`).
Returns:
The corresponding densities, sorted by increasing pattern name.
"""
if not map_name_density:
map_name_density = PATTERN_CLUSTERING_ENV.map_name_density
return [map_name_density[name] for name in sorted(map_name_density.keys())]
# Default parameters
INFINITY = 100000
[docs]def pattern_distance(
w1: str,
w2: str,
map_name_dfa: dict = None,
densities: list = None,
infinity: float = INFINITY,
normalized: bool = False
) -> float:
"""
Compute the pattern distance between two strings.
Args:
w1 (str): The first compared string.
w2 (str): The second compared string.
map_name_dfa (dict): Maps each pattern name (`str`) with its corresponding `Automaton`.
densities (list): A density vector. See `make_densities()`.
infinity (float): The infinite distance.
normalized (bool): Pass `True` to get a distance in [0, 1],
otherwise in [0, len(w1) + len(w2)]
Returns:
The corresponding distance.
"""
if not map_name_dfa:
map_name_dfa = PATTERN_CLUSTERING_ENV.map_name_dfa
if not densities:
densities = PATTERN_CLUSTERING_ENV.densities
g1 = make_pattern_automaton(w1, map_name_dfa)
g2 = make_pattern_automaton(w2, map_name_dfa)
return (
pattern_distance_normalized(g1, g2, densities, infinity) if normalized else
_pattern_distance(g1, g2, densities, infinity)
)
# pool.starmap prevents to use a lambda.
[docs]def make_pattern_automaton_python(w: str, map_name_dfa: dict, make_mg: MultiGrepFunctor = None):
"""
`PatternAutomaton.__init__` wrapper.
Args:
w (str): The string to convert.
map_name_dfa (dict): Maps each pattern name (`str`) with its corresponding `Automaton`.
make_mg (MultiGrepFunctor): The strategy used to build the PatternAutomaton,
defaults to `None`.
Returns:
The corresponding `PatternAutomaton` python instance.
"""
return PatternAutomaton(w, map_name_dfa, make_mg)
[docs]def make_pattern_automata(
lines: list,
map_name_dfa: dict,
make_mg: callable = None
) -> list:
"""
Converts input lines to the `PatternAutomaton` C++ instances.
Args:
lines (list): A list gathering the input lines (`str`).
map_name_dfa (dict): Maps each pattern name (`str`) with its corresponding `Automaton`.
make_mg: A `MultiGrepFunctor` instance.
Returns:
The corresponding list of pattern automata.
"""
# Transform python PatternAutomaton to a C++ PatternAutomaton
def to_pc_boost_pattern_automaton(g: PatternAutomaton):
map_name_id = {k: i for (i, k) in enumerate(sorted(map_name_dfa.keys()))}
n = len(g.w) + 1
_g = _PatternAutomaton(n, len(map_name_dfa), g.w)
for e in edges(g):
q = source(e, g)
r = target(e, g)
a = map_name_id[label(e, g)]
_g.add_edge(q, r, a)
return _g
# Parallelize PatternAutomata construction
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
pas = pool.starmap(
make_pattern_automaton_python,
[(line, map_name_dfa, make_mg) for line in lines]
)
return [
to_pc_boost_pattern_automaton(pa)
for pa in pas
]
[docs]def pattern_clustering_without_preprocess(
lines: list,
map_name_dfa: dict = None,
densities: list = None,
max_dist: float = 0.6,
use_async: bool = True,
make_mg: callable = None
) -> list:
"""
Computes the pattern clustering of input lines without aggregating duplicated PAs.
Args:
lines: A `list(str)` gathering the input lines.
map_name_dfa: A `dict{str : Automaton}` mapping each pattern name
with the corresponding Automaton.
densities: A density vector. See `make_densities()`.
max_dist: The maximum distance between an element of a cluster and the
cluster representative. As distances are normalized, this value should
be lower than 1.0.
use_async: Pass `True` to run computations using async calls. This accelerates
computations.
make_mg: A `MultiGrepFunctor` instance.
Returns:
A `list(int)` mapping each line index with its corresponding cluster identifier.
"""
if not map_name_dfa:
map_name_dfa = PATTERN_CLUSTERING_ENV.map_name_dfa
if not densities:
densities = PATTERN_CLUSTERING_ENV.densities
pattern_automata = make_pattern_automata(lines, map_name_dfa, make_mg)
return _pattern_clustering(pattern_automata, densities, max_dist, use_async)
[docs]def group_by_identical_pa(pas: list, are_equal: callable = None) -> dict:
"""
Groups matching `PatternAutomaton` python instances.
Args:
pas: A `list(PatternAutomaton)` instance.
are_equal: A `callable(PatternAutomaton, PatternAutomaton) -> bool`
checking whether two PAs are homomorphic (pass `None`
if the compared PAs are minimal to accelerate the processing).
Returns:
A dictionary mapping each reference PAs to the matching instances found in `pas`.
"""
if are_equal is None:
def are_equal(pa1, pa2):
return pa1 == pa2
ref_pa_indices = list()
map_refpa_pas = dict()
for (i, pa) in enumerate(pas):
ref_pai = next(
(
ref_pai
for ref_pai in ref_pa_indices
if are_equal(pas[ref_pai], pa)
),
None
)
if ref_pai is not None:
map_refpa_pas[ref_pai].append(i)
else:
ref_pa_indices.append(i)
map_refpa_pas[i] = [i]
return map_refpa_pas
[docs]def pattern_clustering_with_preprocess(
lines: list,
map_name_dfa: dict = None,
densities: list = None,
max_dist: float = 0.6,
use_async: bool = True,
make_mg: callable = None
) -> list:
"""
Computes the pattern clustering of input lines by grouping matching PAs.
This implies that lines having matching PatternAutomaton always fall in the same clusters
which accelerate the code. Sometimes, this may lead to weird cluster, especially if some
lines are unrelated and conform to the same `PatternAutomaton`.
Args:
lines: A `list(str)` gathering the input lines.
map_name_dfa: A `dict{str : Automaton}` mapping each pattern name
with the corresponding Automaton.
densities: A density vector. See `make_densities()`.
max_dist: The maximum distance between an element of a cluster and the
cluster representative. As distances are normalized, this value should
be lower than 1.0.
use_async: Pass `True` to run computations using async calls. This accelerates
computations.
make_mg: A `MultiGrepFunctor` instance.
Returns:
A `list(int)` mapping each line index with its corresponding cluster identifier.
"""
if not map_name_dfa:
map_name_dfa = PATTERN_CLUSTERING_ENV.map_name_dfa
if not densities:
densities = PATTERN_CLUSTERING_ENV.densities
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
pas = pool.starmap(
make_pattern_automaton_python,
[(line, map_name_dfa) for line in lines]
)
# Group pattern automat by distinct PA. As PAs are indexed by rows,
# this indexes rows by reference row.
map_refrow_rows = group_by_identical_pa(pas)
ref_rows = [ref_row for ref_row in map_refrow_rows.keys()]
# Run the pattern clustering only for the reference lines.
distinct_lines = [lines[row] for row in ref_rows]
ref_clusters = _pattern_clustering(
make_pattern_automata(distinct_lines, map_name_dfa, make_mg),
densities,
max_dist,
use_async
)
# Map each row with its corresponding cluster
clusters = [None] * len(lines)
for (ref_row, cluster) in zip(ref_rows, ref_clusters):
clusters[ref_row] = cluster
for row in map_refrow_rows[ref_row]:
clusters[row] = cluster
return clusters
pattern_clustering = pattern_clustering_with_preprocess