# Copyright 2023 AntGroup CO., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
from typing import List, Optional
import tensorflow as tf
from tensorflow.python.framework import tensor_shape
from openasce.utils.logger import logger
[docs]class DNNLayer(tf.keras.layers.Layer):
"""Building a MLP/DNN Layer.
Layer: dense dnn layer
inputs:
2d tensor (batch_size, input_dim)
outputs:
2d tensor (batch_size, output_dim)
"""
[docs] def __init__(
self,
hidden_units: Optional[List] = None,
activation="relu",
l1_reg=1e-4,
l2_reg=1e-4,
dropout_rate=0,
use_bn=False,
apply_final_act=True,
seed=1024,
**kwargs
):
"""Initialize DNNLayer.
Args:
hidden_units: list of positive integer, the layer number and units in each layer.
activation: Activation function to use.
l1_reg: float between 0 and 1. L1 regularizer strength applied to the kernel weights matrix.
l2_reg: float between 0 and 1. L2 regularizer strength applied to the kernel weights matrix.
dropout_rate: float in [0,1). Fraction of the units to dropout.
use_bn: bool. Whether use BatchNormalization before activation or not.
apply_final_act: whether to apply act in final layer
seed: A Python integer to use as random seed.
"""
self.hidden_units = hidden_units
self.activation = activation
self.dropout_rate = dropout_rate
self.seed = seed
self.l1_reg = l1_reg
self.l2_reg = l2_reg
self.use_bn = use_bn
self.apply_final_act = apply_final_act
self.hidden_outputs = []
super(DNNLayer, self).__init__(**kwargs)
[docs] def build(self, input_shape):
input_size = input_shape[-1]
hidden_units = [int(input_size)] + list(self.hidden_units)
self.kernels = [
self.add_weight(
name="kernel" + str(i),
shape=[hidden_units[i], hidden_units[i + 1]],
initializer=tf.keras.initializers.he_normal(seed=self.seed),
regularizer=tf.keras.regularizers.l1_l2(self.l1_reg, self.l2_reg),
trainable=True,
)
for i in range(len(self.hidden_units))
]
self.bias = [
self.add_weight(
name="bias" + str(i),
shape=[
self.hidden_units[i],
],
initializer=tf.keras.initializers.Zeros(),
trainable=True,
)
for i in range(len(self.hidden_units))
]
if self.use_bn:
self.bn_layers = [
tf.keras.layers.BatchNormalization(name="bn_layer_{}".format(i))
for i in range(len(self.hidden_units))
]
if self.dropout_rate is not None and self.dropout_rate > 0:
self.dropout_layers = [
tf.keras.layers.Dropout(
self.dropout_rate,
seed=self.seed + i,
name="dropout_layer_{}".format(i),
)
for i in range(len(self.hidden_units))
]
self.activation_layers = [
tf.keras.layers.Activation(self.activation, name="act_layer_{}".format(i))
for i in range(len(self.hidden_units))
]
super(DNNLayer, self).build(input_shape)
[docs] def call(self, inputs, training=None, **kwargs):
"""Calls the layer on new inputs.
Args:
inputs: 2d tensor (batch_size, dim_1), deep features.
Returns:
2d tensor (batch_size, out_dim).
"""
deep_output = inputs
for i in range(len(self.hidden_units)):
fc = tf.nn.bias_add(
tf.tensordot(deep_output, self.kernels[i], axes=(-1, 0)), self.bias[i]
)
if self.use_bn:
fc = self.bn_layers[i](fc, training=training)
if i < len(self.hidden_units) - 1 or self.apply_final_act:
fc = self.activation_layers[i](fc)
if self.dropout_rate is not None and self.dropout_rate > 0:
fc = self.dropout_layers[i](fc, training=training)
deep_output = fc
self.hidden_outputs.append(fc)
return deep_output
[docs] def compute_output_shape(self, input_shape):
input_shape = tensor_shape.TensorShape(input_shape)
input_shape = input_shape.with_rank_at_least(2)
if tensor_shape.dimension_value(input_shape[-1]) is None:
raise ValueError(
"The innermost dimension of input_shape must be defined, but saw: %s"
% input_shape
)
elif len(self.hidden_units) > 0:
return input_shape[:-1].concatenate(self.hidden_units[-1])
else:
return input_shape
[docs] def get_config(self):
config = {
"activation": self.activation,
"hidden_units": self.hidden_units,
"l1_reg": self.l1_reg,
"l2_reg": self.l2_reg,
"use_bn": self.use_bn,
"dropout_rate": self.dropout_rate,
"apply_final_act": self.apply_final_act,
"seed": self.seed,
}
base_config = super(DNNLayer, self).get_config()
config.update(base_config)
return config
[docs]class DNNModel(tf.keras.Model):
"""Building a DNN model.
Model: DNN or MLP.
inputs:
2d tensor (batch_size, input_dim).
outputs:
2d tensor (batch_size, output_dim).
"""
[docs] def __init__(
self,
hidden_units,
act_fn="relu",
l1_reg=1e-4,
l2_reg=1e-4,
dropout_rate=0,
use_bn=False,
seed=1024,
apply_final_act=False,
name="DNNModel",
):
"""Initialize DNNModel.
Args:
hidden_units: list, unit in each hidden layer.
act_fn: string, activation function.
l2_reg: float, regularization value.
dropout_rate: float, fraction of the units to dropout.
use_bn: boolean, if True, apply BatchNormalization in each hidden layer.
seed: int, random value for initialization.
"""
super(DNNModel, self).__init__(name="DNNModel")
self.dnn_layer = DNNLayer(
hidden_units=hidden_units,
activation=act_fn,
l1_reg=l1_reg,
l2_reg=l2_reg,
dropout_rate=dropout_rate,
use_bn=use_bn,
apply_final_act=apply_final_act,
seed=seed,
name="{}_dnn_layer".format(name),
)
[docs] def call(self, inputs, training=None):
"""Calls the model on new inputs.
Args:
inputs: 2d tensor (batch_size, dim_1), deep features.
wide_input: 2d tensor (batch_size, dim_2), wide features.
Returns:
2d tensor (batch_size, out_dim).
"""
dnn_output = self.dnn_layer(inputs, training=training)
return dnn_output
[docs]class MultiTaskDNNModel(tf.keras.Model):
"""Building a multi task dnn model.
Model: MultiTaskDNN.
inputs:
2d tensor (batch_size, input_dim).
outputs:
list odf 2d tensor [(batch_size, output_dim),..].
"""
[docs] def __init__(
self,
hidden_units,
num_tasks,
task_hidden_units,
act_fn="relu",
l1_reg=1e-4,
l2_reg=1e-4,
dropout_rate=0,
use_bn=False,
seed=1024,
apply_final_act=False,
task_apply_final_act=False,
name="MultiTaskDNNModel",
):
"""Initialize MultiTaskDNNModel.
Args:
hidden_units: list, unit in each hidden layer.
act_fn: string, activation function.
num_tasks: int, number of the task.
l2_reg: float, regularization value.
dropout_rate: float, fraction of the units to dropout.
use_bn: boolean, if True, apply BatchNormalization in each hidden layer.
seed: int, random value for initialization.
"""
super(MultiTaskDNNModel, self).__init__(name="MultiTaskDNNModel")
self.num_tasks = num_tasks
self.dnn_layer = DNNLayer(
hidden_units=hidden_units,
activation=act_fn,
l1_reg=l1_reg,
l2_reg=l2_reg,
dropout_rate=dropout_rate,
use_bn=use_bn,
apply_final_act=apply_final_act,
seed=seed,
name="{}_dnn_layer".format(name),
)
self.task_layers = []
for i in range(self.num_tasks):
self.task_layers.append(
DNNLayer(
hidden_units=task_hidden_units,
activation=act_fn,
l1_reg=l1_reg,
l2_reg=l2_reg,
dropout_rate=dropout_rate,
use_bn=use_bn,
apply_final_act=task_apply_final_act,
seed=seed,
)
)
[docs] def call(self, inputs, training=None):
"""Calls the model on new inputs.
Args:
inputs: 2d tensor (batch_size, dim_1), deep features.
wide_input: 2d tensor (batch_size, dim_2), wide features.
Returns:
list of 2d tensor [(batch_size, output_dim),..].
"""
task_outputs = []
task_inputs = self.dnn_layer(inputs, training=training)
for i in range(self.num_tasks):
task_outputs.append(self.task_layers[i](task_inputs, training=training))
return task_outputs
[docs]class FMLayer(tf.keras.layers.Layer):
"""Building a FM Layer.
Model: Factorization Machine.
Paper: Factorization Machine models pairwise (order-2) feature interactions without linear term and bias.
Link: https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf.
Author: Steffen Rendle
Input shape
- 3D tensor with shape: ``(batch_size,field_size,embedding_size)``.
Output shape
- 2D tensor with shape: ``(batch_size, dim)``.
"""
[docs] def __init__(self, **kwargs):
super(FMLayer, self).__init__(**kwargs)
[docs] def build(self, input_shape):
if len(input_shape) != 3:
raise ValueError(
"Unexpected inputs dimensions % d, expect to be 3 dimensions"
% (len(input_shape))
)
# Be sure to call this somewhere!
super(FMLayer, self).build(input_shape)
[docs] def call(self, inputs, **kwargs):
"""Calls the layer on new inputs.
Args:
input_shape: Shape tuple (tuple of integers) or list of shape tuples (one per output tensor of the layer).
"""
if tf.keras.backend.ndim(inputs) != 3:
raise ValueError(
"Unexpected inputs dimensions %d, expect to be 3 dimensions"
% (tf.keras.backend.ndim(inputs))
)
fm_inputs = inputs
squared_sum = tf.keras.backend.square(
tf.keras.backend.sum(fm_inputs, axis=1, keepdims=True)
)
# squared_sum (batch, dim)
logger.debug("squared_sum shape {}".format(squared_sum.shape))
sum_squared = tf.keras.backend.sum(fm_inputs * fm_inputs, axis=1, keepdims=True)
logger.debug("sum_squared shape {}".format(sum_squared.shape))
output = tf.keras.layers.Lambda(lambda x: 0.5 * tf.subtract(x[0], x[1]))(
[squared_sum, sum_squared]
)
output = tf.keras.backend.squeeze(output, axis=1)
logger.debug("output {}".format(output))
return output
[docs] def compute_output_shape(self, input_shape):
# return (input_shape[0], input_shape[-1])
input_shape = tensor_shape.TensorShape(input_shape)
input_shape = input_shape.with_rank_at_least(2)
if tensor_shape.dimension_value(input_shape[-1]) is None:
raise ValueError(
"The innermost dimension of input_shape must be defined, but saw: %s"
% input_shape
)
return input_shape[:1].concatenate(input_shape[-1])
SQRT_CONST = 1e-3
[docs]def mmd_rbf(X, t, p, sig):
"""Computes the l2-RBF MMD for X given t
Paper: Estimating individual treatment effect: generalization bounds and algorithms.
Gaussian kernel(RBF): https://en.wikipedia.org/wiki/Radial_basis_function_kernel
Args:
X: embeddings.
t: 1 or 0. Distinguish the treatment group and the control group.
p: the proportion of the number of treatment instances / the number of all instances.
sig: kernel width.
"""
it = tf.where(t > 0)[:, 0]
ic = tf.where(t < 1)[:, 0]
Xc = tf.gather(X, ic)
Xt = tf.gather(X, it)
Kcc = tf.exp(-pdist2sq(Xc, Xc) / tf.square(sig))
Kct = tf.exp(-pdist2sq(Xc, Xt) / tf.square(sig))
Ktt = tf.exp(-pdist2sq(Xt, Xt) / tf.square(sig))
m = tf.compat.v1.to_float(tf.shape(Xc)[0])
n = tf.compat.v1.to_float(tf.shape(Xt)[0])
mmd = tf.square(1.0 - p) / (m * (m - 1.0)) * (tf.reduce_sum(Kcc) - m)
mmd = mmd + tf.square(p) / (n * (n - 1.0)) * (tf.reduce_sum(Ktt) - n)
mmd = mmd - 2.0 * p * (1.0 - p) / (m * n) * tf.reduce_sum(Kct)
mmd = 4.0 * mmd
return mmd
[docs]def pdist2sq(X, Y):
"""Computes the squared Euclidean distance between all pairs x in X, y in Y"""
C = -2 * tf.matmul(X, tf.transpose(Y))
nx = tf.compat.v1.reduce_sum(tf.square(X), 1, keep_dims=True)
ny = tf.compat.v1.reduce_sum(tf.square(Y), 1, keep_dims=True)
D = (C + tf.transpose(ny)) + nx
return D