class Params[source]

Params() :: BaseParams

Add Problems

BaseParams.register_problem[source]

BaseParams.register_problem(problem_name:str, problem_type='cls', processing_fn:Callable=None)

Add problems.

Args:

  • problem_name (str): problem name.
  • problem_type (str, optional): One of the following problem types: ['cls', 'seq_tag', 'seq2seq_tag', 'seq2seq_text', 'multi_cls', 'pretrain']. Defaults to 'cls'.
  • processing_fn (Callable, optional): preprocessing function. Defaults to None.

Raises:

  • ValueError: unexpected problem_type
params.register_problem(problem_name='toy_cls', problem_type='cls', processing_fn=toy_cls)
params.register_problem(problem_name='toy_seq_tag', problem_type='seq_tag', processing_fn=toy_seq_tag)

BaseParams.register_multiple_problems[source]

BaseParams.register_multiple_problems(problem_type_dict:Dict[str, str], processing_fn_dict:Dict[str, Callable]=None)

Add multiple problems.

processing_fn_dict is optional, if it's not provided, processing fn will be set as None.

Args:

  • problem_type_dict (Dict[str, str]): problem type dict
  • processing_fn_dict (Dict[str, Callable], optional): problem type fn. Defaults to None.
problem_type_dict = {'toy_cls': 'cls', 'toy_seq_tag': 'seq_tag'}
processing_fn_dict = {'toy_cls': toy_cls, 'toy_seq_tag': toy_seq_tag}
params.register_multiple_problems(problem_type_dict=problem_type_dict, processing_fn_dict=processing_fn_dict)
Adding new problem toy_cls, problem type: cls
Adding new problem toy_seq_tag, problem type: seq_tag

Assign Problems

BaseParams.assign_problem[source]

BaseParams.assign_problem(flag_string:str, model_dir:str=None, base_dir:str=None, dir_name:str=None, predicting=False)

Assign the actual run problem to param. This function will do the following things:

  1. parse the flag string to form the run_problem_list
  2. create checkpoint saving path
  3. calculate total number of training data and training steps
  4. scale learning rate with the number of gpu linearly

Arguments:

  • flag_string {str} -- run problem string
  • example: cws|POS|weibo_ner&weibo_cws

Keyword Arguments:

  • gpu {int} -- number of gpu use for training, this will affect the training steps and learning rate (default: {2})
  • base_dir {str} -- base dir for ckpt, if None, then "models" is assigned (default: {None})
  • dir_name {str} -- dir name for ckpt, if None, will be created automatically (default: {None})
  • predicting {bool} -- whether is predicting
params.assign_problem(flag_string='toy_seq_tag|toy_cls', base_dir=tmp_model_dir)
assert params.problem_assigned
WARNING:root:bert_config not exists. will load model from huggingface checkpoint.

After problem assigned, the model path should be created with tokenizers, label encoder files in it.

Register new problem type

You can also implement your own problem type. Essentially, a problem type has:

  • name
  • top layer
  • label handling function
  • label encoder creating function

Here we register a vector fitting(vector annealing) problem type as an example.

Note: This is originally designed as an internal API for development. So it's not user-friendly.

BaseParams.register_problem_type[source]

BaseParams.register_problem_type(problem_type:str, top_layer:Model=None, label_handling_fn:Callable=None, get_or_make_label_encoder_fn:Callable=None, inherit_from:str=None, description:str=None)

API to register a new problem type

Args:

  • problem_type: string, problem type name
  • top_layer: a keras model with some specific reqirements
  • label_handling_fn: function to convert labels to label ids
  • get_or_make_label_encoder_fn: function to create label encoder, num_classes has to be specified here
from m3tl.problem_types.utils import BaseTop
from m3tl.problem_types.utils import empty_tensor_handling_loss, nan_loss_handling
import tensorflow as tf
from typing import Tuple, Dict
import numpy as np
# top layer
class VectorFit(BaseTop):
    def __init__(self, params: Params, problem_name: str) -> None:
        super(VectorFit, self).__init__(
            params=params, problem_name=problem_name)
        self.num_classes = self.params.num_classes[problem_name]
        self.dense = tf.keras.layers.Dense(self.num_classes)

    def call(self, inputs: Tuple[Dict], mode: str):
        feature, hidden_feature = inputs
        pooled_hidden = hidden_feature['pooled']

        logits = self.dense(pooled_hidden)
        if mode != tf.estimator.ModeKeys.PREDICT:
            # this is the same as the label_id returned by vector_fit_label_handling_fn
            label = feature['{}_label_ids'.format(self.problem_name)]

            loss = empty_tensor_handling_loss(label, logits, cosine_wrapper)
            loss = nan_loss_handling(loss)
            self.add_loss(loss)

            self.add_metric(tf.math.negative(
                loss), name='{}_cos_sim'.format(self.problem_name), aggregation='mean')
        return logits

# label handling fn
def vector_fit_label_handling_fn(target, label_encoder=None, tokenizer=None, decoding_length=None):
    # don't need to encoder labels, return array directly
    # return label_id and label mask
    label_id = np.array(target, dtype='float32')
    return label_id, None

# make label encoder
def vector_fit_get_or_make_label_encoder_fn(params: Params, problem, mode, label_list):
    # don't need to make label encoder here
    # set params num_classes for this problem
    label_array = np.array(label_list)
    params.num_classes[problem] = label_array.shape[-1]
    return None

params.register_problem_type(problem_type='vectorfit', top_layer=VectorFit, label_handling_fn=vector_fit_label_handling_fn, get_or_make_label_encoder_fn=vector_fit_get_or_make_label_encoder_fn)

Utils

BaseParams.from_json[source]

BaseParams.from_json(json_path:str=None)

Load json file as params.

json_path could not be None if the problem is not assigned to params

Args: json_path (str, optional): Path to json file. Defaults to None.

Raises: AttributeError

BaseParams.to_json[source]

BaseParams.to_json()

Save the params as json files. Please note that processing_fn is not saved.

BaseParams.parse_problem_string[source]

BaseParams.parse_problem_string(flag_string:str)

Parse problem string

Arguments: flag_string {str} -- problem string

Returns: list -- problem list

print('chained with |: ', params.parse_problem_string('toy_seq_tag|toy_cls'))
print('chained with &: ', params.parse_problem_string('toy_seq_tag&toy_cls'))
chained with |:  (['toy_cls', 'toy_seq_tag'], [['toy_seq_tag'], ['toy_cls']])
chained with &:  (['toy_cls', 'toy_seq_tag'], [['toy_seq_tag', 'toy_cls']])

BaseParams.get_problem_type[source]

BaseParams.get_problem_type(problem:str)

params.get_problem_type('toy_seq_tag')
'seq_tag'

BaseParams.update_train_steps[source]

BaseParams.update_train_steps(train_steps_per_epoch:int, epoch:int=None, warmup_ratio=0.1)

If the batch_size is dynamic, we have to loop through the tf.data.Dataset to get the accurate number of training steps. In this case, we need a function to update the train_steps which will be used to calculate learning rate schedule.

WARNING: updating should be called before the model is compiled!

Args: train_steps (int): new number of train_steps

If the batch_size is dynamic, we have to loop through the tf.data.Dataset to get the accurate number of training steps. In this case, we need a function to update the train_steps which will be used to calculate learning rate schedule.

WARNING: updating should be called before the model is compiled!

Args:

  • train_steps (int): new number of train_steps
params.update_train_steps(train_steps_per_epoch=100)
print(params.train_steps, params.num_warmup_steps)
1500 150

BaseParams.assign_data_sampling_strategy[source]

BaseParams.assign_data_sampling_strategy(sampling_strategy_name='data_balanced', sampling_strategy_fn:Callable=None)

Set data sampling strategy for multi-task learning.

'data_balanced' and 'problem_balanced' is implemented by default. data_balanced: sampling weight equals to number of rows of that problem chunk. problem_balanced: sampling weight equals to 1 for every problem chunk.

Args:

  • sampling_strategy (str, optional): sampling strategy. Defaults to 'data_balanced'.
  • sampling_strategy_fn (Callable, optional): function to create weight dict. Defaults to None.

Raises:

  • NotImplementedError: sampling_strategy_fn is not implemented yet
  • ValueError: invalid sampling_strategy provided

Returns:

  • Dict[str, float]: sampling weight for each problem_chunk
params.assign_data_sampling_strategy(sampling_strategy_name='problem_balanced')