# Text Classification with Neural Networks

**OBJECTIVES**
- Understand Text Vectorization through Count Vectorization, Sequential Representation, and Word Embeddings
- Use Multilayer Perceptrons, Recurrent Neural Networks, and Transformer Models for text classification
- Use `pytorch` to build text classification models
- Use `huggingface` and the `transformers` library to fine-tune existing large language models

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import numpy as np
import pandas as pd

In [2]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

#### Classifying Jailbreak Prompts

A Jailbreak attack is an attempt to exploit an LLM's vunerabilities.  Today, we will take a few different approaches to identifying jailbreak attempts through text classification.

In [202]:
#read in data
splits = {'train': 'balanced/jailbreak_dataset_train_balanced.csv', 'test': 'balanced/jailbreak_dataset_test_balanced.csv'}
df = pd.read_csv("hf://datasets/jackhhao/jailbreak-classification/" + splits["train"])

In [4]:
#take a peek
df.head()

Unnamed: 0,prompt,type
0,You are a devoted fan of a celebrity.,benign
1,You are Joseph Seed from Far Cry 5. Sermonize ...,benign
2,"As Rei Ayanami from Neon Genesis Evangelion, t...",benign
3,"You are a medieval blacksmith named Wulfric, l...",benign
4,"Answer the following question: So, I worked wi...",benign


In [5]:
#create a tokenizer
tokenizer = Tokenizer(num_words = 500)

In [6]:
#fit the tokenizer -- learns the vocabulary
tokenizer.fit_on_texts(df['prompt'].values)

In [7]:
#look at tokenizer
tokenizer.num_words

500

In [8]:
#create document term matrix (binarized)
dtm = tokenizer.texts_to_matrix(df['prompt'].values)

In [9]:
#take a peek
dtm

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 1., 1., ..., 0., 0., 0.],
       [0., 1., 1., ..., 0., 0., 0.],
       ...,
       [0., 1., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 1., 1., ..., 0., 0., 0.]])

In [10]:
tokenizer.index_word[2]

'and'

In [11]:
[tokenizer.index_word[i] for i in range(1, 50)]

['the',
 'and',
 'to',
 'you',
 'a',
 'of',
 'is',
 'in',
 'it',
 'as',
 'will',
 'that',
 'or',
 'are',
 'with',
 'not',
 'your',
 'i',
 'do',
 'for',
 'if',
 'this',
 'any',
 'be',
 'dan',
 'chatgpt',
 'can',
 'have',
 'answer',
 'an',
 'on',
 'always',
 'all',
 'by',
 'from',
 'about',
 'he',
 'must',
 'no',
 'like',
 'response',
 'anything',
 'should',
 'responses',
 'ai',
 'what',
 'now',
 'user',
 'but']

In [12]:
y = np.where(df['type'] == 'benign', 0, 1)

In [13]:
Xt = torch.tensor(dtm, dtype = torch.float32)
yt = torch.tensor(y, dtype = torch.float32)

In [14]:
from torch.utils.data import TensorDataset

In [15]:
from sklearn.model_selection import train_test_split

In [16]:
X_train, X_test, y_train, y_test = train_test_split(Xt, yt, test_size=.2)

In [17]:
X_train

tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 1., 1.,  ..., 0., 0., 0.],
        [0., 1., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 1., 1.,  ..., 0., 0., 0.],
        [0., 1., 1.,  ..., 0., 0., 0.],
        [0., 1., 0.,  ..., 0., 0., 0.]])

In [18]:
#create data class
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

In [19]:
#dataset and loader -- making batches of our bigger dataset
trainloader = DataLoader(train_dataset, batch_size = 32)
#dataset and loader
testloader = DataLoader(test_dataset, batch_size = 32)

In [20]:
model = nn.Sequential(nn.Linear(in_features=500, out_features=1000),
                      nn.ReLU(),
                      nn.Linear(1000, 100),
                      nn.ReLU(),
                      nn.Linear(100, 1),
                      nn.Sigmoid()
                      )

In [21]:
model(Xt)

tensor([[0.4850],
        [0.4867],
        [0.4842],
        ...,
        [0.4856],
        [0.4850],
        [0.4841]], grad_fn=<SigmoidBackward0>)

In [22]:
loss_fn = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr = 0.01)

In [23]:
from tqdm import tqdm

In [24]:
model = model.to('cuda')

In [25]:
#keep track of the losses
losses = []
#train it for 20 epochs
for epoch in tqdm(range(20)):
  #iterate over the batches
  for x,y in trainloader:
    x = x.to('cuda')
    y = y.to('cuda')
    #feeds data into model
    yhat = model(x)
    #evaluate the predictions
    loss = loss_fn(yhat, y.unsqueeze(1))
    #update the weights/params
    optimizer.zero_grad() #pytorch house cleaning
    loss.backward() #pass info backwards
    optimizer.step() #step towards less loss
    losses.append(loss.item()) #tracking the loss


100%|██████████| 20/20 [00:01<00:00, 15.23it/s]


In [26]:
X_train = X_train.to('cuda')
model(X_train)

tensor([[7.6080e-12],
        [1.0000e+00],
        [5.7215e-08],
        [5.2058e-10],
        [1.0000e+00],
        [1.2170e-06],
        [2.7917e-06],
        [2.5559e-06],
        [1.9318e-09],
        [4.4235e-06],
        [3.1658e-14],
        [1.3348e-10],
        [8.7853e-21],
        [1.8574e-26],
        [1.0863e-12],
        [1.0000e+00],
        [4.8573e-05],
        [1.3076e-08],
        [8.7744e-13],
        [1.0000e+00],
        [1.7011e-27],
        [9.1420e-06],
        [1.1157e-31],
        [2.3193e-10],
        [8.9439e-13],
        [1.0000e+00],
        [3.1938e-29],
        [2.1481e-13],
        [2.3711e-17],
        [1.5619e-18],
        [1.0000e+00],
        [1.0000e+00],
        [1.0000e+00],
        [1.0000e+00],
        [2.7602e-09],
        [1.0000e+00],
        [1.0000e+00],
        [5.4653e-13],
        [6.1534e-08],
        [1.0000e+00],
        [1.0756e-11],
        [3.5019e-08],
        [1.0000e+00],
        [1.6152e-35],
        [1.0000e+00],
        [1

In [27]:
train_predictions = model(X_train)
ytrain_preds = torch.where(train_predictions > .5, 1, 0)

In [28]:
ytrain_preds.shape

torch.Size([835, 1])

In [29]:
y_train.shape

torch.Size([835])

In [30]:
y_train = y_train.to('cuda')

In [31]:
torch.sum(ytrain_preds.squeeze(1) == y_train)/len(y_train)

tensor(1., device='cuda:0')

In [32]:
X_test, y_test = X_test.to('cuda'), y_test.to('cuda')

In [33]:
ytest_preds = torch.where(model(X_test) > .5, 1, 0)
torch.sum(ytest_preds.squeeze(1) == y_test)/len(y_test)

tensor(0.9617, device='cuda:0')

In [34]:
#loss and optimizer
class TextModel(nn.Module):
  def __init__(self):
    super().__init__()
    self.lin1 = nn.Linear(in_features = 500, out_features = 100)
    self.lin2 = nn.Linear(100, 100)
    self.lin3 = nn.Linear(100, 1)
    self.sigmoid = nn.Sigmoid()
    self.act = nn.ReLU()

  def forward(self, x):
    x = self.act(self.lin1(x))
    x = self.act(self.lin2(x))
    return self.sigmoid(self.lin3(x))




In [35]:
#training function
model = TextModel()
optimizer = optim.Adam(model.parameters(), lr = 0.01)
loss_fn = nn.BCELoss()

In [36]:
#torch.save(model, 'textmodel.pt')

In [37]:
from tqdm import tqdm

In [38]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [39]:
model = model.to(device)

In [40]:
#evaluate
for epoch in tqdm(range(10)):
  losses = 0
  for x,y in trainloader:
    x = x.to(device)
    y = y.to(device)
    yhat = model(x)
    y = y.reshape(-1, 1)
    loss = loss_fn(yhat, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses += loss.item()
  if epoch % 10 == 0:
    print(f'Epoch {epoch} Loss: {losses}')

 40%|████      | 4/10 [00:00<00:00, 18.42it/s]

Epoch 0 Loss: 7.5143995471298695


100%|██████████| 10/10 [00:00<00:00, 18.21it/s]


In [41]:
Xt = torch.tensor(X_test.to(device), dtype = torch.float)

  Xt = torch.tensor(X_test.to(device), dtype = torch.float)


In [42]:

output = model(Xt) #model predictions

In [43]:
output[:5]

tensor([[4.5087e-07],
        [2.6038e-06],
        [1.0000e+00],
        [1.0000e+00],
        [1.0000e+00]], device='cuda:0', grad_fn=<SliceBackward0>)

In [44]:
#Converting probabilities to prediction
preds = torch.where(output >= .5, 1, 0)

In [45]:
preds.shape

torch.Size([209, 1])

In [46]:
sum(preds[:, 0] == y_test)/len(y_test)

tensor(0.9761, device='cuda:0')

### Basic RNN

![](https://upload.wikimedia.org/wikipedia/commons/thumb/b/b5/Recurrent_neural_network_unfold.svg/440px-Recurrent_neural_network_unfold.svg.png)

In [47]:
# !pip install -U torch torchtext

In [50]:
#new tokenizer
tokenizer = Tokenizer()
tokenizer.fit_on_texts(df['prompt'].values)

In [51]:
#create sequences
sequences = tokenizer.texts_to_sequences(df['prompt'].values)

In [52]:
#look at first sequence
sequences[0]

[4, 14, 5, 3094, 4337, 6, 5, 5442]

In [53]:
#compare to text
df['prompt'].values[1]

'You are Joseph Seed from Far Cry 5. Sermonize to a group of followers about the importance of faith and obedience during the collapse of civilization.'

In [54]:
sequences[1]

[4,
 14,
 4338,
 7554,
 35,
 1060,
 2721,
 203,
 7555,
 3,
 5,
 657,
 6,
 3599,
 36,
 1,
 1805,
 6,
 2722,
 2,
 1583,
 470,
 1,
 3600,
 6,
 3095]

In [55]:
#pad and make all same length
sequences = pad_sequences(sequences, maxlen=100)

In [56]:
#examine results
sequences[1].shape

(100,)

In [57]:
sequences[1]

array([   0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    4,   14, 4338,
       7554,   35, 1060, 2721,  203, 7555,    3,    5,  657,    6, 3599,
         36,    1, 1805,    6, 2722,    2, 1583,  470,    1, 3600,    6,
       3095], dtype=int32)

In [58]:
#example rnn
rnn = nn.RNN(input_size = 100,
             hidden_size = 30,
             num_layers = 1,
             batch_first = True)

In [59]:
#pass data through
sample_sequence = torch.tensor(sequences[1],
                               dtype = torch.float,
                               ).reshape(1, -1)
sample_sequence.shape

torch.Size([1, 100])

In [60]:
#output
output, hidden = rnn(sample_sequence)

In [61]:
#hidden
hidden.shape

torch.Size([1, 30])

In [62]:
#linear layer
output.shape

torch.Size([1, 30])

In [63]:
#pass through linear
lin1 = nn.Linear(in_features = 30, out_features = 1)

In [64]:
lin1(output)

tensor([[0.6444]], grad_fn=<AddmmBackward0>)

In [66]:
X_train, X_test, y_train, y_test = train_test_split(sequences, yt)

In [67]:
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), y_train)
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), y_test)

In [68]:
trainloader = DataLoader(train_dataset, batch_size = 32)
testloader = DataLoader(test_dataset, batch_size = 32)

In [69]:
model = nn.Sequential(nn.RNN(input_size = 100, hidden_size = 50, num_layers=2),
                      nn.Linear(in_features = 50, out_features=1),
                      nn.Sigmoid())


In [70]:
ex_rnn = nn.RNN(input_size = 100, hidden_size = 50, num_layers=2)
ex_rnn(train_dataset[0][0].unsqueeze(0))

(tensor([[-4.0267e-02, -6.5108e-01,  5.6029e-01, -1.8202e-01,  7.5730e-01,
          -7.5152e-01,  4.0158e-01,  1.2469e-03,  7.9779e-01,  3.2215e-01,
          -7.4351e-01, -6.9993e-01, -1.3441e-01, -6.5982e-01,  1.8362e-02,
          -2.6788e-01, -1.9714e-01, -2.8105e-01,  7.9029e-01,  5.8012e-01,
          -6.4096e-01,  6.6497e-01,  4.7763e-01,  6.3916e-01, -2.8295e-01,
          -3.6966e-01, -3.0895e-01,  5.7973e-01, -5.4188e-01, -1.9349e-01,
          -2.6235e-01,  8.9672e-01,  1.8769e-01, -1.2924e-02,  4.4903e-01,
           4.5544e-01,  2.2435e-01,  1.1875e-01,  3.5073e-01,  7.1262e-01,
          -3.2007e-01, -3.6262e-01, -1.0571e-01, -5.7416e-01, -3.8877e-01,
           4.8972e-01, -8.4148e-04,  3.9820e-01, -4.7924e-01,  1.2552e-01]],
        grad_fn=<SqueezeBackward1>),
 tensor([[ 1.0000e+00, -1.0000e+00,  1.0000e+00, -1.0000e+00, -1.0000e+00,
           1.0000e+00,  1.0000e+00,  1.0000e+00, -1.0000e+00,  1.0000e+00,
           1.0000e+00, -1.0000e+00, -1.0000e+00,  1.0000e+00,

In [159]:
#class
class BasicRNN(nn.Module):
  def __init__(self):
    super().__init__()
    self.rnn = nn.RNN(input_size = 100,
                    hidden_size = 100,
                    num_layers = 2,
                    batch_first = True,
                    bidirectional = True)
    self.lin1 = nn.Linear(in_features = 200, out_features=128)
    self.lin2 = nn.Linear(128,64)
    self.lin3 = nn.Linear(64, 1)
    self.act = nn.ReLU()
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    x, _ = self.rnn(x) #extracting important information
    x = self.act(self.lin1(x)) #multilayer perceptron -- to predict
    x = self.act(self.lin2(x))
    x = self.sigmoid(self.lin3(x))
    return x


In [160]:
#optimizer and loss
model = BasicRNN()
optimizer = optim.Adam(model.parameters(), lr = 0.01)
loss_fn = nn.BCELoss()

In [161]:
model = model.to(device)

In [162]:
#train
for epoch in tqdm(range(100)):
  losses = 0
  for x,y in trainloader:
    x,y = x.to(device), y.to(device)
    yhat = model(x)
    y = y.reshape(-1, 1)
    loss = loss_fn(yhat, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses += loss.item()
  if epoch % 100 == 0:
    print(f'Epoch {epoch} Loss: {losses}')

  2%|▏         | 2/100 [00:00<00:09, 10.49it/s]

Epoch 0 Loss: 16.39161452651024


100%|██████████| 100/100 [00:09<00:00, 10.90it/s]


In [163]:
Xt = torch.tensor(X_test, dtype = torch.float)

In [164]:
output = model(Xt.to(device))

In [165]:
preds = torch.where(output >= .5, 1, 0)

In [166]:
#preds = output.argmax(axis = 1)

In [167]:
y_test.shape, preds.shape

(torch.Size([261]), torch.Size([261, 1]))

In [168]:
y_test = y_test.to(device)

In [169]:
sum(preds.squeeze(-1) == y_test)/len(y_test)

tensor(0.7126, device='cuda:0')

#### LSTM

In [170]:
# nn.LSTM()
class BasicLSTM(nn.Module):
  def __init__(self):
    super().__init__()
    self.rnn = nn.LSTM(input_size = 100,
                    hidden_size = 100,
                    num_layers = 1,
                    batch_first = True)

    self.lin1 = nn.Linear(in_features = 100, out_features=100)
    self.lin2 = nn.Linear(in_features = 100, out_features = 1)
    self.act = nn.ReLU()
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    x, _ = self.rnn(x)
    x = self.act(self.lin1(x))
    x = self.lin2(x)
    return self.sigmoid(x)

In [177]:
model = BasicLSTM()
optimizer = optim.Adam(model.parameters(), lr = 0.01)
loss_fn = nn.BCELoss()
model = model.to(device)

In [178]:
#train
for epoch in range(100):
  losses = 0
  for x,y in trainloader:
    x,y = x.to(device), y.to(device)
    yhat = model(x)
    y = y.reshape(-1, 1)
    loss = loss_fn(yhat, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses += loss.item()
  if epoch % 10 == 0:
    print(f'Epoch {epoch} Loss: {losses}')

Epoch 0 Loss: 16.08923327922821
Epoch 10 Loss: 12.361571580171585
Epoch 20 Loss: 11.54874774813652
Epoch 30 Loss: 11.193515717983246
Epoch 40 Loss: 10.83355501294136
Epoch 50 Loss: 10.410791963338852
Epoch 60 Loss: 10.066820561885834
Epoch 70 Loss: 9.951143264770508
Epoch 80 Loss: 9.611373007297516
Epoch 90 Loss: 9.232007339596748


In [180]:
Xt = torch.tensor(X_test, dtype = torch.float).to(device)
output = model(Xt)
preds = torch.where(output >= .5, 1, 0)
sum(preds[:, 0] == y_test)/len(y_test)

tensor(0.6437, device='cuda:0')

In [191]:
class RNN2(nn.Module):
  def __init__(self):
    super().__init__()
    self.rnn = nn.GRU(input_size = 100,
                    hidden_size = 50,
                    num_layers = 2,
                    batch_first = True)

    self.lin1 = nn.Linear(in_features = 50, out_features=100)
    self.lin2 = nn.Linear(in_features = 100, out_features = 1)
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    x, _ = self.rnn(x)
    x = self.lin1(x)
    x = self.lin2(x)
    return self.sigmoid(x)

In [192]:
model = RNN2()
optimizer = optim.Adam(model.parameters(), lr = 0.01)
loss_fn = nn.BCELoss()
model = model.to(device)

In [195]:
#train
for epoch in range(100):
  losses = 0
  for x,y in trainloader:
    x, y = x.to(device), y.to(device)
    yhat = model(x)
    y = y.reshape(-1, 1)
    loss = loss_fn(yhat, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses += loss.item()
  if epoch % 100 == 0:
    print(f'Epoch {epoch} Loss: {losses}')

Epoch 0 Loss: 2.572824880015105
Epoch 10 Loss: 1.6687806567642838
Epoch 20 Loss: 2.084140522405505
Epoch 30 Loss: 0.14215507486005663
Epoch 40 Loss: 2.5895073297433555
Epoch 50 Loss: 4.6147964131087065
Epoch 60 Loss: 1.1158297238289379
Epoch 70 Loss: 1.6419790575746447
Epoch 80 Loss: 1.0041726826311788
Epoch 90 Loss: 7.350915879011154
Epoch 100 Loss: 0.32188589729139494
Epoch 110 Loss: 0.826660448419716
Epoch 120 Loss: 0.09769756537468766
Epoch 130 Loss: 0.010040155945596041
Epoch 140 Loss: 0.005505460409189311
Epoch 150 Loss: 0.0035882416049775046
Epoch 160 Loss: 0.002542968425865322
Epoch 170 Loss: 0.0018933118432123974
Epoch 180 Loss: 0.0014562517539018494
Epoch 190 Loss: 0.001146036667118655
Epoch 200 Loss: 0.0009173696277831878
Epoch 210 Loss: 0.0007440028362964424
Epoch 220 Loss: 0.0006096175069977006
Epoch 230 Loss: 0.0005037316108048862
Epoch 240 Loss: 0.00041903144462378944
Epoch 250 Loss: 0.0003505299123727282
Epoch 260 Loss: 0.0002946198410908145
Epoch 270 Loss: 0.0002486322

In [201]:
output = model(Xt)
preds = torch.where(output >= .5, 1, 0)
sum(preds.squeeze(-1) == y_test)/len(y_test)

tensor(0.6475, device='cuda:0')