# Copyright 2023 AntGroup CO., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import numpy as np
from pyhocon import ConfigTree
from .bin import BinMapper
from .dataset import Dataset
from .information import CausalDataInfo
from .cppnode import predict
from .utils import to_row_major, INFO, set_log_level_cpp
from .losses import Loss
[docs]class Boosting(object):
[docs] def __init__(self, tree_cls, conf: ConfigTree, bin_mapper: BinMapper = None):
self.info = CausalDataInfo(conf)
self.conf = conf
self.n_estimators = conf.n_estimators
self.trees = []
self.subfeature = conf.feature_ratio
self.subsample = conf.instance_ratio
self.learning_rate = conf.learning_rate
self.valid_losses = []
self.bin_mapper = bin_mapper if bin_mapper is not None else BinMapper(conf)
self.verbose = conf.verbose
self.print_intervels = conf.get('print_intervels', 5)
self.tree_cls = tree_cls
self.n_iter_no_change = conf.n_iter_no_change
level = conf.get("loglevel", None)
if level is not None:
set_log_level_cpp(level)
self.tol = conf.tol
[docs] def fit(self, data: Dataset):
"""
Fit the causal forest model on the provided dataset.
Arguments:
data: Dataset object containing the input features, targets, and treatment.
"""
parametrs = self.preprocess(data)
print_intervels = self.print_intervels
for i in range(self.n_estimators):
if i % print_intervels == 0 and self.verbose is True:
INFO(f'{"="*20}{i}-th tree{"="*20}')
tree = self.tree_cls(self.conf, self.bin_mapper, verbose=self.verbose)
g, h = tree.gradients(parametrs['target'], parametrs['pred'])
cg, ch = tree.gradients(parametrs['target'], parametrs['cpred'])
tr_data, _, idx, val_idx = self.tr_val(data)
tree.fit((g[idx], h[idx]), (cg[idx], ch[idx]), tr_data, parametrs['eta'][idx])
# update current prediction
self._update_paramers(tree=tree, **parametrs)
# validation loss
preloss, postloss = self._validation(
parametrs['target'][val_idx], parametrs['pred'][val_idx], parametrs['cpred'][val_idx]
)
if i % self.print_intervels == 0 and self.verbose is True and len(self.valid_losses) > 0:
INFO(f'Debias loss:{preloss:.3f}\tpostloss:{postloss:.3f}\ttotal:{self.valid_losses[-1]:.3f}')
self.trees.append(tree)
if self.early_stopping():
if self.verbose is True:
INFO(f'early stop')
break
self.postprocess()
[docs] def preprocess(self, data: Dataset):
"""
Perform preprocessing steps on the provided dataset.
Arguments:
data: Dataset object containing the input features, targets, and treatment.
"""
if self.info.feature_columns is None:
self.info.feature_columns = data.feature_columns
self.conf.put('dataset.feature', data.feature_columns)
# check data
self.check_data(data)
self.bin_mapper.fit_dataset(data)
# parameters
parameters = {}
parameters['pred'] = np.full_like(data.targets, self.conf.init)
parameters['cpred'] = np.full_like(data.targets, self.conf.init)
parameters['target'] = to_row_major(data.targets)
parameters['features'] = to_row_major(data.features)
parameters['treatment'] = to_row_major(data.treatment, np.int32)
parameters['eta'] = np.zeros([data.targets.shape[0], self.info.n_treatment - 1]) # only binary is supported!
cur_pred = np.zeros_like(data.targets, dtype=parameters['target'].dtype)
cur_cpred = np.zeros_like(data.targets, dtype=parameters['target'].dtype)
parameters['out'] = (cur_pred, cur_cpred)
return parameters
[docs] def check_data(self, data: Dataset):
n = data.features.shape[0]
assert (n == data.targets.shape[0]) and (
data.treatment.shape[0] == n
), f'You should guarantee the number of features, outcomes and treatment are the same!'
assert (
data.targets.shape[1] == self.info.n_period
), f'The number of outcomes({data.targets.shape[1]}) does\'t equals to {self.info.n_period}!'
assert self.info.n_period > self.info.treat_dt, f'treat_dt should less than n_period!'
[docs] def tr_val(self, data: Dataset, subsample=None, **kwargs):
"""
Split the dataset into training and validation sets.
Arguments:
data: Dataset object containing the input features, targets, and treatment.
subsample: Ratio of instances to include in the training set. If None, uses the instance_ratio from conf.
Returns:
Tuple containing the training dataset, validation dataset, indices of training instances, and indices of validation instances.
"""
n = len(data)
if subsample is None:
subsample = self.info.instance_ratio
if subsample >= 1:
return data, data, slice(None), slice(None)
tr_n = int(n * subsample)
idx = np.random.permutation(n).astype(np.int32)
return data.sub_dataset(idx[:tr_n]), data.sub_dataset(idx[tr_n:]), idx[:tr_n], idx[tr_n:]
[docs] def postprocess(self):
opt_n = np.argmin(self.valid_losses)
self.trees = self.trees[: opt_n + 1]
def _validation(self, target, prediction, cprediction=None):
op_loss = Loss.new_instance(self.conf.tree)
preloss = 0
if self.info.treat_dt > 0 and cprediction is not None:
preloss = op_loss.loss(target[:, : self.info.treat_dt], cprediction[:, : self.info.treat_dt]).mean()
postloss = op_loss.loss(target[:, self.info.treat_dt :], prediction[:, self.info.treat_dt :]).mean()
self.valid_losses.append(preloss + self.conf.tree.coefficient * postloss)
return preloss, postloss
def _update_paramers(self, *args, **kwargs):
tree = kwargs.pop('tree')
features = kwargs.pop('features')
treatment = kwargs.pop('treatment')
out = kwargs.pop('out')
pred = kwargs.pop('pred')
cpred = kwargs.pop('cpred')
eta = kwargs.pop('eta', None)
cur_pred, cur_cpred, cur_eta = tree.predict(features, treatment, key='cf_outcomes', out=out)
pred += self.learning_rate * cur_pred
cpred += self.learning_rate * cur_cpred
if cur_eta is not None:
eta += self.learning_rate * cur_eta[:, :1]
return {'pred': pred, 'cpred': cpred, 'eta': eta}
[docs] def early_stopping(self) -> bool:
"""
Check if early stopping criteria is met based on the validation losses.
Returns:
True if early stopping criteria is met, False otherwise.
"""
# use both pre_loss and post_loss
min_loss = min(self.valid_losses)
cur_loss = self.valid_losses[-1]
no_change_steps = self.n_iter_no_change if np.isscalar(self.n_iter_no_change) else np.inf
n, opt_n = len(self.valid_losses), np.argmin(self.valid_losses)
if cur_loss > min_loss * (1 + self.tol) and no_change_steps <= n - opt_n - 1:
return True
if len(self.trees) > 0 and self.trees[-1].root.is_leaf:
INFO(f'The last tree is no more splitting!')
return True
self.opt_step = opt_n
if n - opt_n > 1 and self.verbose is True:
INFO(f'{opt_n}-th has min loss ({min_loss:.3}), no change steps: {n - opt_n - 1}')
return False
[docs] def predict(self, X, key: str, *, data: Dataset = None):
"""
Predict the output using the trained model on the input data.
Arguments:
X: Feature matrix of the input data.
key: Type of prediction, can be 'leaf_id', 'effect', or 'effect-ND'.
data: Dataset object containing the feature data.
Returns:
Prediction result based on the specified key.
Raises:
RuntimeError: If the specified key is unknown and not supported.
"""
if X is None:
X = data.features
x = to_row_major(X)
if key == 'leaf_id':
leaf_ids = np.zeros([x.shape[0], len(self.trees)], dtype=np.float)
predict([tree.export()[0] for tree in self.trees], x, leaf_ids, 'leaf_id')
return leaf_ids.astype(int)
elif key == 'effect':
results = np.zeros([x.shape[0], len(self.trees), 2, self.info.n_period], dtype=np.float64)
predict([tree.export()[0] for tree in self.trees], x, results, 'outcomes')
tau_hat = self.learning_rate * results.sum(axis=1)
tau_hat = tau_hat[:, 1] - tau_hat[:, 0]
if self.info.treat_dt > 0:
tau_hat = tau_hat[:, self.info.treat_dt :] - tau_hat[:, : self.info.treat_dt].mean(
axis=1, keepdims=True
)
else:
tau_hat = tau_hat[:, self.info.treat_dt :]
return tau_hat
elif key == 'effect-ND':
results = np.zeros([x.shape[0], len(self.trees), 2, self.info.n_period], dtype=np.float64)
predict([tree.export()[0] for tree in self.trees], x, results, 'outcomes')
tau_hat = self.learning_rate * results.sum(axis=1)
tau_hat = tau_hat[:, 1] - tau_hat[:, 0]
tau_hat = tau_hat[:, self.info.treat_dt]
return tau_hat
raise RuntimeError(f'Only `leaf_id`, `effect` and `effect-ND` are supported, but {key} is unknown!')
[docs] def effect(self, X=None, *, data: Dataset = None):
"""Predict the treatment effect on the input data."""
return self.predict(X, 'effect', data=data)
[docs] def split_counts(self, trees=None, feature_names=None):
"""
Count the number of splits made on each feature in the gradient boost causal trees.
Arguments:
trees: List of decision trees. If None, uses the trained trees.
feature_names: List of feature names. If None, uses the feature columns from conf.
Returns:
Dictionary with feature names as keys and the corresponding split counts as values.
"""
if feature_names is None:
feature_names = self.info.feature_columns
counts = np.zeros([len(feature_names)], dtype=int)
if trees is None:
trees = self.trees
for tree in trees:
for node in tree.nodes:
counts[node.split_feature] += 1
return {feature_names[i]: cnt for i, cnt in enumerate(counts)}