Add problems.
Args:
- problem_name (str): problem name.
- problem_type (str, optional): One of the following problem types: ['cls', 'seq_tag', 'seq2seq_tag', 'seq2seq_text', 'multi_cls', 'pretrain']. Defaults to 'cls'.
- processing_fn (Callable, optional): preprocessing function. Defaults to None.
Raises:
- ValueError: unexpected problem_type
params.register_problem(problem_name='toy_cls', problem_type='cls', processing_fn=toy_cls)
params.register_problem(problem_name='toy_seq_tag', problem_type='seq_tag', processing_fn=toy_seq_tag)
Add multiple problems.
processing_fn_dict is optional, if it's not provided, processing fn will be set as None.
Args:
- problem_type_dict (Dict[str, str]): problem type dict
- processing_fn_dict (Dict[str, Callable], optional): problem type fn. Defaults to None.
problem_type_dict = {'toy_cls': 'cls', 'toy_seq_tag': 'seq_tag'}
processing_fn_dict = {'toy_cls': toy_cls, 'toy_seq_tag': toy_seq_tag}
params.register_multiple_problems(problem_type_dict=problem_type_dict, processing_fn_dict=processing_fn_dict)
Assign the actual run problem to param. This function will do the following things:
- parse the flag string to form the run_problem_list
- create checkpoint saving path
- calculate total number of training data and training steps
- scale learning rate with the number of gpu linearly
Arguments:
- flag_string {str} -- run problem string
- example: cws|POS|weibo_ner&weibo_cws
Keyword Arguments:
- gpu {int} -- number of gpu use for training, this will affect the training steps and learning rate (default: {2})
- base_dir {str} -- base dir for ckpt, if None, then "models" is assigned (default: {None})
- dir_name {str} -- dir name for ckpt, if None, will be created automatically (default: {None})
- predicting {bool} -- whether is predicting
params.assign_problem(flag_string='toy_seq_tag|toy_cls', base_dir=tmp_model_dir)
assert params.problem_assigned
After problem assigned, the model path should be created with tokenizers, label encoder files in it.
Register new problem type
You can also implement your own problem type. Essentially, a problem type has:
- name
- top layer
- label handling function
- label encoder creating function
Here we register a vector fitting(vector annealing) problem type as an example.
Note: This is originally designed as an internal API for development. So it's not user-friendly.
API to register a new problem type
Args:
- problem_type: string, problem type name
- top_layer: a keras model with some specific reqirements
- label_handling_fn: function to convert labels to label ids
- get_or_make_label_encoder_fn: function to create label encoder, num_classes has to be specified here
from m3tl.problem_types.utils import BaseTop
from m3tl.problem_types.utils import empty_tensor_handling_loss, nan_loss_handling
import tensorflow as tf
from typing import Tuple, Dict
import numpy as np
# top layer
class VectorFit(BaseTop):
def __init__(self, params: Params, problem_name: str) -> None:
super(VectorFit, self).__init__(
params=params, problem_name=problem_name)
self.num_classes = self.params.num_classes[problem_name]
self.dense = tf.keras.layers.Dense(self.num_classes)
def call(self, inputs: Tuple[Dict], mode: str):
feature, hidden_feature = inputs
pooled_hidden = hidden_feature['pooled']
logits = self.dense(pooled_hidden)
if mode != tf.estimator.ModeKeys.PREDICT:
# this is the same as the label_id returned by vector_fit_label_handling_fn
label = feature['{}_label_ids'.format(self.problem_name)]
loss = empty_tensor_handling_loss(label, logits, cosine_wrapper)
loss = nan_loss_handling(loss)
self.add_loss(loss)
self.add_metric(tf.math.negative(
loss), name='{}_cos_sim'.format(self.problem_name), aggregation='mean')
return logits
# label handling fn
def vector_fit_label_handling_fn(target, label_encoder=None, tokenizer=None, decoding_length=None):
# don't need to encoder labels, return array directly
# return label_id and label mask
label_id = np.array(target, dtype='float32')
return label_id, None
# make label encoder
def vector_fit_get_or_make_label_encoder_fn(params: Params, problem, mode, label_list):
# don't need to make label encoder here
# set params num_classes for this problem
label_array = np.array(label_list)
params.num_classes[problem] = label_array.shape[-1]
return None
params.register_problem_type(problem_type='vectorfit', top_layer=VectorFit, label_handling_fn=vector_fit_label_handling_fn, get_or_make_label_encoder_fn=vector_fit_get_or_make_label_encoder_fn)
Parse problem string
Arguments: flag_string {str} -- problem string
Returns: list -- problem list
print('chained with |: ', params.parse_problem_string('toy_seq_tag|toy_cls'))
print('chained with &: ', params.parse_problem_string('toy_seq_tag&toy_cls'))
params.get_problem_type('toy_seq_tag')
If the batch_size is dynamic, we have to loop through the tf.data.Dataset to get the accurate number of training steps. In this case, we need a function to update the train_steps which will be used to calculate learning rate schedule.
WARNING: updating should be called before the model is compiled!
Args:
- train_steps (int): new number of train_steps
params.update_train_steps(train_steps_per_epoch=100)
print(params.train_steps, params.num_warmup_steps)
Set data sampling strategy for multi-task learning.
'data_balanced' and 'problem_balanced' is implemented by default. data_balanced: sampling weight equals to number of rows of that problem chunk. problem_balanced: sampling weight equals to 1 for every problem chunk.
Args:
- sampling_strategy (str, optional): sampling strategy. Defaults to 'data_balanced'.
- sampling_strategy_fn (Callable, optional): function to create weight dict. Defaults to None.
Raises:
- NotImplementedError: sampling_strategy_fn is not implemented yet
- ValueError: invalid sampling_strategy provided
Returns:
- Dict[str, float]: sampling weight for each problem_chunk
params.assign_data_sampling_strategy(sampling_strategy_name='problem_balanced')