Augmentations

Because this will help

Patching


source

create_patch3d


def create_patch3d(
    xb, frame_len, frame_stride, patch_len, patch_stride, constant_pad:bool=False, constant_pad_value:int=0
):

xb: [bs x n_vars x seq_len]


source

create_patch


def create_patch(
    xb, patch_len, stride, constant_pad:bool=False, constant_pad_value:int=0
):

xb: [bs x n_vars x seq_len] out: [bs x num_patch x n_vars x patch_len]

batch_size = 2
n_vars = 256
max_len = 100

# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (batch_size,))

# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)

# test seq_len > patch len == stride 
xb = create_patch(x_nested, patch_len=10, stride=5, constant_pad=False)
#xb_rep = create_patch(x_nested, patch_len=500, stride=500, constant_pad=True)
#x_nested.shape, xb.shape, xb_rep.shape
#xb_rep_short = create_patch(x_short, patch_en=502, stride=500, replication_pad=False)b
batch_size = 2
n_vars = 7
max_len = 100

# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (batch_size,))

# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)

# test seq_len > patch len == stride 
xb = create_patch3d(x_nested, patch_len=4, patch_stride=3, frame_len=9, frame_stride=9, constant_pad=True)
#xb_rep = create_patch(x_nested, patch_len=500, stride=500, constant_pad=True)
#x_nested.shape, xb.shape, xb_rep.shape
#xb_rep_short = create_patch(x_short, patch_en=502, stride=500, replication_pad=False)b
x = torch.randn(1, 7, 100)
xb = create_patch(x, patch_len=10, stride=10, constant_pad=True)
xb.shape
torch.Size([1, 10, 7, 10])

source

create_patch3d


def create_patch3d(
    xb, frame_len, frame_stride, patch_len, patch_stride, constant_pad:bool=False, constant_pad_value:int=0
):

xb: [bs x n_vars x seq_len]


source

unpatch


def unpatch(
    x, seq_len, remove_padding:bool=True
):

x: [bs/None x patch_num x n_vars x patch_len] returns x: [bs x n_vars x seq_len]

x = torch.randn(1,1,50)

# test seq_len > patch len == stride 
xb = create_patch(x, patch_len=6, stride=6, constant_pad=True)
xb = unpatch(xb, seq_len=50, remove_padding=False)
xb.shape
torch.Size([1, 1, 54])

Patch Masking


source

mask_patches_simple


def mask_patches_simple(
    xb, mask_ratio
):

Function that masks patches using fixed ratio approach similar to random_masking

xb: [bs x patch_num x n_vars x patch_len] or nested tensor Returns: x_masked: masked tensor with same shape as input mask: binary mask where 1 indicates masked positions (bs x patch_num x n_vars)


source

random_masking


def random_masking(
    xb, mask_ratio
):

Call self as a function.

x = torch.randn(50,16,7,50)
mask_ratio = 0.4

x_new = mask_patches_simple(x,mask_ratio=mask_ratio)
x_new.shape, x.shape
(torch.Size([50, 16, 7, 50]), torch.Size([50, 16, 7, 50]))
patch_len = 750
d_model = 12
n_vars = 7
max_len = 100
bs = 3
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))

# Create input tensors with different sequence lengths
x_list = [torch.randn(length, n_vars, patch_len) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)

x = mask_patches_simple(x_nested, mask_ratio=0.5)
x[0].shape, x[1].shape, x_nested[0].shape, x_nested[1].shape
(torch.Size([70, 7, 750]),
 torch.Size([52, 7, 750]),
 torch.Size([70, 7, 750]),
 torch.Size([52, 7, 750]))

Value Augmentations


source

jitter_augmentation


def jitter_augmentation(
    x, mask_ratio:float=0.05, jitter_ratio:float=0.05, p:int=1
):

Call self as a function.


source

remove_values


def remove_values(
    x, mask_ratio
):

Call self as a function.

patch_len = 750
d_model = 12
n_vars = 7
max_len = 100
bs = 3
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))

# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, max_len) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)

x = jitter_augmentation(x_nested, mask_ratio=0.9, jitter_ratio=0.05)
x.shape
torch.Size([3, 7, j8])
## note that the random number generator advances state...
torch.manual_seed(42)
x = torch.randn(4,7,1000)

torch.manual_seed(42)
x_new, n_masks = jitter_augmentation(x)
n_masks /(4* 7*1000)

torch.manual_seed(42)
x_new2, n_masks2 = jitter_augmentation(x)
torch.equal(x_new, x_new2)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[18], line 7
      4 x = torch.randn(4,7,1000)
      6 torch.manual_seed(42)
----> 7 x_new, n_masks = jitter_augmentation(x)
      8 n_masks /(4* 7*1000)
     10 torch.manual_seed(42)

ValueError: too many values to unpack (expected 2)

Shuffle Augmentations


source

shuffle_dim


def shuffle_dim(
    x, dim:int=1, p:float=0.5
):

shuffles a dimension randomly along dim x: [bs x n channels x n patches x patch len]


source

reverse_sequence


def reverse_sequence(
    x, seq_dim:tuple=(-1,), p:float=0.5
):

Call self as a function.

x = torch.randn(4,1,5,5).to('cuda')

torch.equal(shuffle_dim(x), x)
False

source

channel_masking


def channel_masking(
    x, dim:int=1, p:float=0.5, specific_channels:NoneType=None
):

Masks up to n channels - 1 randomly of x or specific channels if provided

x = torch.randn(4,7,5,5)

x_new = channel_masking(x, dim=1, p=0.5)

patch_len = 750
d_model = 12
n_vars = 7
max_len = 100
bs = 3
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))

# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)

x = channel_masking(x, dim=1, p=1.5, specific_channels=[0,1])

source

random_crop


def random_crop(
    x, c_in, min_len, p:float=0.5
):

Args: x: Either a regular tensor [n_channels x seq_len] or a nested tensor with variable lengths Returns: Cropped version of input with random length

x = torch.randn(1, 7, 100)
print(x.shape)
y = random_crop(x_nested, 7, 10)
print(y.shape)
[y_i.shape for y_i in y]
torch.Size([1, 7, 100])
torch.Size([3, 7, j1])
[torch.Size([7, 60]), torch.Size([7, 58]), torch.Size([7, 83])]

Transforms


source

IntraClassCutMix1d


def IntraClassCutMix1d(
    mix_prob:float=0.5, # probability of applying cutmix
    return_y_every_sec:int=30, # length of segment to mix, if one value of y corresponds to 30 seconds of signal data, this should be set to 30.
    frequency:int=125, # frequency of the data
    ignore_index:int=-100
):

Intra-class CutMix for 1D data (e.g., time-series).

This is a callback that can be used to apply CutMix to the training data. It is used to mix segments within the same class.

x = torch.randn(4,7,90)
x_c = x.clone()
y = torch.randint(0, 5, size=(4,90//30))
xxt = IntraClassCutMix1d(mix_prob=1, frequency=1, return_y_every_sec=30)
batch = (x,y)
batch = xxt(batch)
torch.equal(x_c, batch[0]) == False
True
n_vars = 4
max_len = 90
bs = 3
frequnecy = 1
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))

# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
y_list = [torch.randint(0, 5, size=(length,)) for length in seq_lens]
y_nested = torch.nested.as_nested_tensor(y_list, layout=torch.jagged)

xxt = IntraClassCutMix1d(mix_prob=1, frequency=1, return_y_every_sec=30)
batch = (x_nested, y_nested)
batch = xxt(batch)
batch[0], batch[1]
(NestedTensor(size=(3, 4, j167), offsets=tensor([  0,  51, 135, 187]), contiguous=True),
 NestedTensor(size=(3, j168), offsets=tensor([0, 2, 4, 6]), contiguous=True))

source

IntraClassCutMixBatch


def IntraClassCutMixBatch(
    mix_prob:float=0.5, # probability of applying cutmix
    return_y_every_sec:int=30, # length of segment to mix, if one value of y corresponds to 30 seconds of signal data, this should be set to 30.
    frequency:int=125, # frequency of the data
    ignore_index:int=-100,
    intra_class_only:bool=True, # whether to mix only within same class (True) or across all classes (False)
):

Intra-class CutMix for 1D data (e.g., time-series).

This is a callback that can be used to apply CutMix to the training data. It is used to mix segments within the same class.

This is different to IntraClassCutMix1d in that it mixes segments of the same class across batches of data, rather than just at the same segment

x = torch.randn(4,7,90)
x_c = x.clone()
y = torch.randint(0, 5, size=(4,90//30))
xxt = IntraClassCutMixBatch(mix_prob=1, frequency=1, return_y_every_sec=30)
batch = [x,y]
batch = xxt(batch)
torch.equal(x_c, batch[0]) == False
n_vars = 4
max_len = 90
bs = 3
frequnecy = 1
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))

# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
y_list = [torch.randint(0, 5, size=(length,)) for length in seq_lens]
y_nested = torch.nested.as_nested_tensor(y_list, layout=torch.jagged)

xxt = IntraClassCutMixBatch(mix_prob=1, frequency=1, return_y_every_sec=30)
batch = [x_nested, y_nested]
batch = xxt(batch)
batch[0], batch[1]

source

MixupCallbackSequence


def MixupCallbackSequence(
    num_classes, return_y_every_sec:int=30, frequency:NoneType=None,
    mixup_alpha:float=0.4, # alpha parameter for the beta distribution
    ignore_index:int=-100, # ignore index
):

Mixup for 1D data (e.g., time-series).

This callback applies Mixup to the training data, blending both the input data and the labels.

See tsai implementation here: https://github.com/timeseriesAI/tsai/blob/bdff96cc8c4c8ea55bc20d7cffd6a72e402f4cb2/tsai/data/mixed_augmentation.py#L43

Note that this creates non-integer labels/soft labels. Loss functions should be able to handle this.

x = torch.randn(4,7,90)
x_c = x.clone()
y_og = torch.randint(0, 2, size=(4,90//30))
y_og[1,2] = -100
y_og[2,1] = -100
y_c = y_og.clone()
xxt = MixupCallbackSequence(num_classes=2, mixup_alpha=0.4)
batch = (x,y_og)
batch = xxt(batch)
torch.equal(x_c, batch[0]) == False, torch.equal(y_c, batch[1]) == False
(True, True)

source

MixupCallbackClassification


def MixupCallbackClassification(
    num_classes, mixup_alpha:float=0.4, # alpha parameter for the beta distribution
    ignore_index:int=-100, # ignore index
):

Mixup for 1D data (e.g., time-series).

This callback applies Mixup to the training data, blending both the input data and the labels.

See tsai implementation here: https://github.com/timeseriesAI/tsai/blob/bdff96cc8c4c8ea55bc20d7cffd6a72e402f4cb2/tsai/data/mixed_augmentation.py#L43

Note that this creates non-integer labels/soft labels. Loss functions should be able to handle this.

x = torch.randn(4,7,90)
x_c = x.clone()
y_og = torch.randint(0, 2, size=(4,2))
print(y_og, y_og.shape)
y_c = y_og.clone()
xxt = MixupCallbackClassification(num_classes=2, mixup_alpha=0.4)
batch = (x,y_og)
batch = xxt(batch)
torch.equal(x_c, batch[0]) == False, torch.equal(y_c, batch[1]) == False
tensor([[0, 1],
        [0, 0],
        [0, 1],
        [0, 1]]) torch.Size([4, 2])
tensor([[0., 1.],
        [0., 0.],
        [0., 1.],
        [0., 1.]]) tensor([[0., 1.],
        [0., 0.],
        [0., 1.],
        [0., 1.]])
(True, False)
n_vars = 4
max_len = 90
bs = 3
frequnecy = 1
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))

# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
y_list = [torch.randint(0, 5, size=(length,)) for length in seq_lens]
y_nested = torch.nested.as_nested_tensor(y_list, layout=torch.jagged)

batch = [x_nested, y_nested]

xxt = MixupCallbackSequence(num_classes=5, mixup_alpha=0.4, return_y_every_sec=30, frequency=frequnecy)
batch = [x_nested, y_nested]
batch = xxt(batch)
batch[0], batch[1]
(NestedTensor(size=(3, 4, j173), offsets=tensor([  0,  63, 133, 204]), contiguous=True),
 NestedTensor(size=(3, 5, j174), offsets=tensor([0, 2, 4, 6]), contiguous=True))

source

MixupTimeDemos


def MixupTimeDemos(
    mixup_alpha:float=0.4, # alpha parameter for the beta distribution
    mix_time:bool=False, # whether to mix time to event
    mix_demographics:bool=False, # whether to mix demographics
):

Mixup for 1D data and time to event data + demographics

This callback applies Mixup to the training data, blending both the input data and the labels.

See tsai implementation here: https://github.com/timeseriesAI/tsai/blob/bdff96cc8c4c8ea55bc20d7cffd6a72e402f4cb2/tsai/data/mixed_augmentation.py#L43

Note that this creates non-integer labels/soft labels. Loss functions should be able to handle this.

x = torch.randn(4,7,90)
x_c = x.clone()
y_og = torch.randint(0, 2, (4,))
time_idx = torch.randint(0, 18, (4,)).float()
age_demographics = torch.randint(20, 80, (4,)).float()
gender_demographics = torch.randint(0, 2, (4,)).float()
demographics = torch.stack([age_demographics, gender_demographics], dim=1)
print(demographics, demographics.shape)
print(y_og, y_og.shape)
y_c = y_og.clone()
xxt = MixupTimeDemos(mixup_alpha=0.4)
batch = (x,y_og, time_idx, demographics)
batch = xxt(batch)
torch.equal(x_c, batch[0]) == False, torch.equal(y_c, batch[1]) == False
tensor([[45.,  1.],
        [62.,  1.],
        [32.,  0.],
        [44.,  1.]]) torch.Size([4, 2])
tensor([1, 0, 0, 1]) torch.Size([4])
(True, True)

source

MixupTimeDemosNoX


def MixupTimeDemosNoX(
    mixup_alpha:float=0.4, # alpha parameter for the beta distribution
    mix_time:bool=False, # whether to mix time to event
):

Mixup for 1D data and time to event data + demographics

This callback applies Mixup to the training data, blending both the input data and the labels.

See tsai implementation here: https://github.com/timeseriesAI/tsai/blob/bdff96cc8c4c8ea55bc20d7cffd6a72e402f4cb2/tsai/data/mixed_augmentation.py#L43

Note that this creates non-integer labels/soft labels. Loss functions should be able to handle this.


source

TransformsCallback


def TransformsCallback(
    transforms
):

Applies a series of transforms to the input data, on train_batch_start.


source

MixupPretrainCallback


def MixupPretrainCallback(
    mixup_alpha:float=0.4, # alpha parameter for the beta distribution
):

Mixup for 1D data (e.g., time-series).

This callback applies Mixup to the training data, blending both the input data

See tsai implementation here: https://github.com/timeseriesAI/tsai/blob/bdff96cc8c4c8ea55bc20d7cffd6a72e402f4cb2/tsai/data/mixed_augmentation.py#L43


source

VariableChannelInput


def VariableChannelInput(
    indexes_to_add_channels, n_channels_expected, channel_dim:int=1
):

Randomly adds 0 channels at correct index to the input data to match the number of channels expected by the self supervised model.

callback = VariableChannelInput(
    indexes_to_add_channels=[0],  # Add channel at index 0
    n_channels_expected=5,        # Total channels expected
    channel_dim=1                 # Channel dimension
)

x1 = torch.randn(4,4,90)
x2 = callback(x1)
x1.shape, x2.shape
(torch.Size([4, 4, 90]), torch.Size([4, 5, 90]))
n_vars = 4
max_len = 100
bs = 3
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))

# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)

callback = VariableChannelInput(
    indexes_to_add_channels=[0],  # Add channel at index 0
    n_channels_expected=5,        # Total channels expected
    channel_dim=1                 # Channel dimension
)

x_nested2 = callback(x_nested)
x_nested.shape, x_nested2.shape
(torch.Size([3, 4, j1]), torch.Size([3, 5, j3]))
x_reversed = callback.reverse(x_nested2)
torch.equal(x_nested.to_padded_tensor(0), x_reversed.to_padded_tensor(0))
True
# Create a simple dataset
class SimpleDataset(Dataset):
    def __init__(self, num_samples=100, seq_length=90, num_channels=4):
        self.data = torch.randn(num_samples, num_channels, seq_length)  # [samples, channels, seq_len]
        self.labels = torch.randint(0, 5, (num_samples,))  # Random labels 0-4
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Create dataset and dataloader
dataset = SimpleDataset()
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

# Create callback instance
# Let's say we want to add a zero channel at index 0, 
# and our model expects 5 channels total (4 original + 1 added)
callback = VariableChannelInput(
    indexes_to_add_channels=[0],  # Add channel at index 0
    n_channels_expected=5,        # Total channels expected
    channel_dim=1                 # Channel dimension
)

# Test the callback
for batch in dataloader:
    print("Original batch shape:", batch[0].shape)  # Should be [16, 4, 90]
    
    # Simulate what happens in Lightning
    callback.on_train_batch_start(None, None, batch, 0)
    print("Modified batch shape:", batch[0].shape)  # Should be [16, 5, 90]
    
    # Check that the first channel is zeros
    print("First channel is all zeros:", torch.all(batch[0][:, 0] == 0))
    
    break  # Just test one batch
Original batch shape: torch.Size([16, 4, 90])
Modified batch shape: torch.Size([16, 5, 90])
First channel is all zeros: tensor(True)