batch_size = 2
n_vars = 256
max_len = 100
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (batch_size,))
# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
# test seq_len > patch len == stride
xb = create_patch(x_nested, patch_len=10, stride=5, constant_pad=False)
#xb_rep = create_patch(x_nested, patch_len=500, stride=500, constant_pad=True)
#x_nested.shape, xb.shape, xb_rep.shape
#xb_rep_short = create_patch(x_short, patch_en=502, stride=500, replication_pad=False)bAugmentations
Patching
create_patch3d
def create_patch3d(
xb, frame_len, frame_stride, patch_len, patch_stride, constant_pad:bool=False, constant_pad_value:int=0
):
xb: [bs x n_vars x seq_len]
create_patch
def create_patch(
xb, patch_len, stride, constant_pad:bool=False, constant_pad_value:int=0
):
xb: [bs x n_vars x seq_len] out: [bs x num_patch x n_vars x patch_len]
batch_size = 2
n_vars = 7
max_len = 100
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (batch_size,))
# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
# test seq_len > patch len == stride
xb = create_patch3d(x_nested, patch_len=4, patch_stride=3, frame_len=9, frame_stride=9, constant_pad=True)
#xb_rep = create_patch(x_nested, patch_len=500, stride=500, constant_pad=True)
#x_nested.shape, xb.shape, xb_rep.shape
#xb_rep_short = create_patch(x_short, patch_en=502, stride=500, replication_pad=False)bx = torch.randn(1, 7, 100)
xb = create_patch(x, patch_len=10, stride=10, constant_pad=True)
xb.shapetorch.Size([1, 10, 7, 10])
create_patch3d
def create_patch3d(
xb, frame_len, frame_stride, patch_len, patch_stride, constant_pad:bool=False, constant_pad_value:int=0
):
xb: [bs x n_vars x seq_len]
unpatch
def unpatch(
x, seq_len, remove_padding:bool=True
):
x: [bs/None x patch_num x n_vars x patch_len] returns x: [bs x n_vars x seq_len]
x = torch.randn(1,1,50)
# test seq_len > patch len == stride
xb = create_patch(x, patch_len=6, stride=6, constant_pad=True)
xb = unpatch(xb, seq_len=50, remove_padding=False)
xb.shapetorch.Size([1, 1, 54])
Patch Masking
mask_patches_simple
def mask_patches_simple(
xb, mask_ratio
):
Function that masks patches using fixed ratio approach similar to random_masking
xb: [bs x patch_num x n_vars x patch_len] or nested tensor Returns: x_masked: masked tensor with same shape as input mask: binary mask where 1 indicates masked positions (bs x patch_num x n_vars)
random_masking
def random_masking(
xb, mask_ratio
):
Call self as a function.
x = torch.randn(50,16,7,50)
mask_ratio = 0.4
x_new = mask_patches_simple(x,mask_ratio=mask_ratio)
x_new.shape, x.shape(torch.Size([50, 16, 7, 50]), torch.Size([50, 16, 7, 50]))
patch_len = 750
d_model = 12
n_vars = 7
max_len = 100
bs = 3
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))
# Create input tensors with different sequence lengths
x_list = [torch.randn(length, n_vars, patch_len) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
x = mask_patches_simple(x_nested, mask_ratio=0.5)
x[0].shape, x[1].shape, x_nested[0].shape, x_nested[1].shape(torch.Size([70, 7, 750]),
torch.Size([52, 7, 750]),
torch.Size([70, 7, 750]),
torch.Size([52, 7, 750]))
Value Augmentations
jitter_augmentation
def jitter_augmentation(
x, mask_ratio:float=0.05, jitter_ratio:float=0.05, p:int=1
):
Call self as a function.
remove_values
def remove_values(
x, mask_ratio
):
Call self as a function.
patch_len = 750
d_model = 12
n_vars = 7
max_len = 100
bs = 3
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))
# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, max_len) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
x = jitter_augmentation(x_nested, mask_ratio=0.9, jitter_ratio=0.05)
x.shapetorch.Size([3, 7, j8])
## note that the random number generator advances state...
torch.manual_seed(42)
x = torch.randn(4,7,1000)
torch.manual_seed(42)
x_new, n_masks = jitter_augmentation(x)
n_masks /(4* 7*1000)
torch.manual_seed(42)
x_new2, n_masks2 = jitter_augmentation(x)
torch.equal(x_new, x_new2)--------------------------------------------------------------------------- ValueError Traceback (most recent call last) Cell In[18], line 7 4 x = torch.randn(4,7,1000) 6 torch.manual_seed(42) ----> 7 x_new, n_masks = jitter_augmentation(x) 8 n_masks /(4* 7*1000) 10 torch.manual_seed(42) ValueError: too many values to unpack (expected 2)
Shuffle Augmentations
shuffle_dim
def shuffle_dim(
x, dim:int=1, p:float=0.5
):
shuffles a dimension randomly along dim x: [bs x n channels x n patches x patch len]
reverse_sequence
def reverse_sequence(
x, seq_dim:tuple=(-1,), p:float=0.5
):
Call self as a function.
x = torch.randn(4,1,5,5).to('cuda')
torch.equal(shuffle_dim(x), x)False
channel_masking
def channel_masking(
x, dim:int=1, p:float=0.5, specific_channels:NoneType=None
):
Masks up to n channels - 1 randomly of x or specific channels if provided
x = torch.randn(4,7,5,5)
x_new = channel_masking(x, dim=1, p=0.5)
patch_len = 750
d_model = 12
n_vars = 7
max_len = 100
bs = 3
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))
# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
x = channel_masking(x, dim=1, p=1.5, specific_channels=[0,1])random_crop
def random_crop(
x, c_in, min_len, p:float=0.5
):
Args: x: Either a regular tensor [n_channels x seq_len] or a nested tensor with variable lengths Returns: Cropped version of input with random length
x = torch.randn(1, 7, 100)
print(x.shape)
y = random_crop(x_nested, 7, 10)
print(y.shape)
[y_i.shape for y_i in y]torch.Size([1, 7, 100])
torch.Size([3, 7, j1])
[torch.Size([7, 60]), torch.Size([7, 58]), torch.Size([7, 83])]
Transforms
IntraClassCutMix1d
def IntraClassCutMix1d(
mix_prob:float=0.5, # probability of applying cutmix
return_y_every_sec:int=30, # length of segment to mix, if one value of y corresponds to 30 seconds of signal data, this should be set to 30.
frequency:int=125, # frequency of the data
ignore_index:int=-100
):
Intra-class CutMix for 1D data (e.g., time-series).
This is a callback that can be used to apply CutMix to the training data. It is used to mix segments within the same class.
x = torch.randn(4,7,90)
x_c = x.clone()
y = torch.randint(0, 5, size=(4,90//30))
xxt = IntraClassCutMix1d(mix_prob=1, frequency=1, return_y_every_sec=30)
batch = (x,y)
batch = xxt(batch)
torch.equal(x_c, batch[0]) == FalseTrue
n_vars = 4
max_len = 90
bs = 3
frequnecy = 1
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))
# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
y_list = [torch.randint(0, 5, size=(length,)) for length in seq_lens]
y_nested = torch.nested.as_nested_tensor(y_list, layout=torch.jagged)
xxt = IntraClassCutMix1d(mix_prob=1, frequency=1, return_y_every_sec=30)
batch = (x_nested, y_nested)
batch = xxt(batch)
batch[0], batch[1](NestedTensor(size=(3, 4, j167), offsets=tensor([ 0, 51, 135, 187]), contiguous=True),
NestedTensor(size=(3, j168), offsets=tensor([0, 2, 4, 6]), contiguous=True))
IntraClassCutMixBatch
def IntraClassCutMixBatch(
mix_prob:float=0.5, # probability of applying cutmix
return_y_every_sec:int=30, # length of segment to mix, if one value of y corresponds to 30 seconds of signal data, this should be set to 30.
frequency:int=125, # frequency of the data
ignore_index:int=-100,
intra_class_only:bool=True, # whether to mix only within same class (True) or across all classes (False)
):
Intra-class CutMix for 1D data (e.g., time-series).
This is a callback that can be used to apply CutMix to the training data. It is used to mix segments within the same class.
This is different to IntraClassCutMix1d in that it mixes segments of the same class across batches of data, rather than just at the same segment
x = torch.randn(4,7,90)
x_c = x.clone()
y = torch.randint(0, 5, size=(4,90//30))
xxt = IntraClassCutMixBatch(mix_prob=1, frequency=1, return_y_every_sec=30)
batch = [x,y]
batch = xxt(batch)
torch.equal(x_c, batch[0]) == Falsen_vars = 4
max_len = 90
bs = 3
frequnecy = 1
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))
# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
y_list = [torch.randint(0, 5, size=(length,)) for length in seq_lens]
y_nested = torch.nested.as_nested_tensor(y_list, layout=torch.jagged)
xxt = IntraClassCutMixBatch(mix_prob=1, frequency=1, return_y_every_sec=30)
batch = [x_nested, y_nested]
batch = xxt(batch)
batch[0], batch[1]MixupCallbackSequence
def MixupCallbackSequence(
num_classes, return_y_every_sec:int=30, frequency:NoneType=None,
mixup_alpha:float=0.4, # alpha parameter for the beta distribution
ignore_index:int=-100, # ignore index
):
Mixup for 1D data (e.g., time-series).
This callback applies Mixup to the training data, blending both the input data and the labels.
See tsai implementation here: https://github.com/timeseriesAI/tsai/blob/bdff96cc8c4c8ea55bc20d7cffd6a72e402f4cb2/tsai/data/mixed_augmentation.py#L43
Note that this creates non-integer labels/soft labels. Loss functions should be able to handle this.
x = torch.randn(4,7,90)
x_c = x.clone()
y_og = torch.randint(0, 2, size=(4,90//30))
y_og[1,2] = -100
y_og[2,1] = -100
y_c = y_og.clone()
xxt = MixupCallbackSequence(num_classes=2, mixup_alpha=0.4)
batch = (x,y_og)
batch = xxt(batch)
torch.equal(x_c, batch[0]) == False, torch.equal(y_c, batch[1]) == False(True, True)
MixupCallbackClassification
def MixupCallbackClassification(
num_classes, mixup_alpha:float=0.4, # alpha parameter for the beta distribution
ignore_index:int=-100, # ignore index
):
Mixup for 1D data (e.g., time-series).
This callback applies Mixup to the training data, blending both the input data and the labels.
See tsai implementation here: https://github.com/timeseriesAI/tsai/blob/bdff96cc8c4c8ea55bc20d7cffd6a72e402f4cb2/tsai/data/mixed_augmentation.py#L43
Note that this creates non-integer labels/soft labels. Loss functions should be able to handle this.
x = torch.randn(4,7,90)
x_c = x.clone()
y_og = torch.randint(0, 2, size=(4,2))
print(y_og, y_og.shape)
y_c = y_og.clone()
xxt = MixupCallbackClassification(num_classes=2, mixup_alpha=0.4)
batch = (x,y_og)
batch = xxt(batch)
torch.equal(x_c, batch[0]) == False, torch.equal(y_c, batch[1]) == Falsetensor([[0, 1],
[0, 0],
[0, 1],
[0, 1]]) torch.Size([4, 2])
tensor([[0., 1.],
[0., 0.],
[0., 1.],
[0., 1.]]) tensor([[0., 1.],
[0., 0.],
[0., 1.],
[0., 1.]])
(True, False)
n_vars = 4
max_len = 90
bs = 3
frequnecy = 1
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))
# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
y_list = [torch.randint(0, 5, size=(length,)) for length in seq_lens]
y_nested = torch.nested.as_nested_tensor(y_list, layout=torch.jagged)
batch = [x_nested, y_nested]
xxt = MixupCallbackSequence(num_classes=5, mixup_alpha=0.4, return_y_every_sec=30, frequency=frequnecy)
batch = [x_nested, y_nested]
batch = xxt(batch)
batch[0], batch[1](NestedTensor(size=(3, 4, j173), offsets=tensor([ 0, 63, 133, 204]), contiguous=True),
NestedTensor(size=(3, 5, j174), offsets=tensor([0, 2, 4, 6]), contiguous=True))
MixupTimeDemos
def MixupTimeDemos(
mixup_alpha:float=0.4, # alpha parameter for the beta distribution
mix_time:bool=False, # whether to mix time to event
mix_demographics:bool=False, # whether to mix demographics
):
Mixup for 1D data and time to event data + demographics
This callback applies Mixup to the training data, blending both the input data and the labels.
See tsai implementation here: https://github.com/timeseriesAI/tsai/blob/bdff96cc8c4c8ea55bc20d7cffd6a72e402f4cb2/tsai/data/mixed_augmentation.py#L43
Note that this creates non-integer labels/soft labels. Loss functions should be able to handle this.
x = torch.randn(4,7,90)
x_c = x.clone()
y_og = torch.randint(0, 2, (4,))
time_idx = torch.randint(0, 18, (4,)).float()
age_demographics = torch.randint(20, 80, (4,)).float()
gender_demographics = torch.randint(0, 2, (4,)).float()
demographics = torch.stack([age_demographics, gender_demographics], dim=1)
print(demographics, demographics.shape)
print(y_og, y_og.shape)
y_c = y_og.clone()
xxt = MixupTimeDemos(mixup_alpha=0.4)
batch = (x,y_og, time_idx, demographics)
batch = xxt(batch)
torch.equal(x_c, batch[0]) == False, torch.equal(y_c, batch[1]) == Falsetensor([[45., 1.],
[62., 1.],
[32., 0.],
[44., 1.]]) torch.Size([4, 2])
tensor([1, 0, 0, 1]) torch.Size([4])
(True, True)
MixupTimeDemosNoX
def MixupTimeDemosNoX(
mixup_alpha:float=0.4, # alpha parameter for the beta distribution
mix_time:bool=False, # whether to mix time to event
):
Mixup for 1D data and time to event data + demographics
This callback applies Mixup to the training data, blending both the input data and the labels.
See tsai implementation here: https://github.com/timeseriesAI/tsai/blob/bdff96cc8c4c8ea55bc20d7cffd6a72e402f4cb2/tsai/data/mixed_augmentation.py#L43
Note that this creates non-integer labels/soft labels. Loss functions should be able to handle this.
TransformsCallback
def TransformsCallback(
transforms
):
Applies a series of transforms to the input data, on train_batch_start.
MixupPretrainCallback
def MixupPretrainCallback(
mixup_alpha:float=0.4, # alpha parameter for the beta distribution
):
Mixup for 1D data (e.g., time-series).
This callback applies Mixup to the training data, blending both the input data
See tsai implementation here: https://github.com/timeseriesAI/tsai/blob/bdff96cc8c4c8ea55bc20d7cffd6a72e402f4cb2/tsai/data/mixed_augmentation.py#L43
VariableChannelInput
def VariableChannelInput(
indexes_to_add_channels, n_channels_expected, channel_dim:int=1
):
Randomly adds 0 channels at correct index to the input data to match the number of channels expected by the self supervised model.
callback = VariableChannelInput(
indexes_to_add_channels=[0], # Add channel at index 0
n_channels_expected=5, # Total channels expected
channel_dim=1 # Channel dimension
)
x1 = torch.randn(4,4,90)
x2 = callback(x1)
x1.shape, x2.shape(torch.Size([4, 4, 90]), torch.Size([4, 5, 90]))
n_vars = 4
max_len = 100
bs = 3
# Create sequences of different lengths
seq_lens = torch.randint(50, max_len, (bs,))
# Create input tensors with different sequence lengths
x_list = [torch.randn(n_vars, length) for length in seq_lens]
x_nested = torch.nested.as_nested_tensor(x_list, layout=torch.jagged)
callback = VariableChannelInput(
indexes_to_add_channels=[0], # Add channel at index 0
n_channels_expected=5, # Total channels expected
channel_dim=1 # Channel dimension
)
x_nested2 = callback(x_nested)
x_nested.shape, x_nested2.shape(torch.Size([3, 4, j1]), torch.Size([3, 5, j3]))
x_reversed = callback.reverse(x_nested2)
torch.equal(x_nested.to_padded_tensor(0), x_reversed.to_padded_tensor(0))True
# Create a simple dataset
class SimpleDataset(Dataset):
def __init__(self, num_samples=100, seq_length=90, num_channels=4):
self.data = torch.randn(num_samples, num_channels, seq_length) # [samples, channels, seq_len]
self.labels = torch.randint(0, 5, (num_samples,)) # Random labels 0-4
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx], self.labels[idx]
# Create dataset and dataloader
dataset = SimpleDataset()
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)
# Create callback instance
# Let's say we want to add a zero channel at index 0,
# and our model expects 5 channels total (4 original + 1 added)
callback = VariableChannelInput(
indexes_to_add_channels=[0], # Add channel at index 0
n_channels_expected=5, # Total channels expected
channel_dim=1 # Channel dimension
)
# Test the callback
for batch in dataloader:
print("Original batch shape:", batch[0].shape) # Should be [16, 4, 90]
# Simulate what happens in Lightning
callback.on_train_batch_start(None, None, batch, 0)
print("Modified batch shape:", batch[0].shape) # Should be [16, 5, 90]
# Check that the first channel is zeros
print("First channel is all zeros:", torch.all(batch[0][:, 0] == 0))
break # Just test one batchOriginal batch shape: torch.Size([16, 4, 90])
Modified batch shape: torch.Size([16, 5, 90])
First channel is all zeros: tensor(True)