Collision Avoidance

Data Collection


import traitlets

import ipywidgets.widgets as widgets

from IPython.display import display

from jetbot import Camera, bgr8_to_jpeg


#連結相機

camera = Camera.instance(width=224, height=224)

image = widgets.Image(format='jpeg', width=224, height=224)  # this width and height doesn't necessarily have to match the camera

camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)


#建立資料夾

import os


blocked_dir = 'dataset/blocked'

free_dir = 'dataset/free'


# we have this "try/except" statement because these next functions can throw an error if the directories exist already

try:

    os.makedirs(free_dir)

    os.makedirs(blocked_dir)

except FileExistsError:

    print('Directories not created because they already exist')

    

#建立按鈕物件

button_layout = widgets.Layout(width='128px', height='64px')

free_button = widgets.Button(description='add free', button_style='success', layout=button_layout)

blocked_button = widgets.Button(description='add blocked', button_style='danger', layout=button_layout)

free_count = widgets.IntText(layout=button_layout, value=len(os.listdir(free_dir)))

blocked_count = widgets.IntText(layout=button_layout, value=len(os.listdir(blocked_dir)))



#使用UUID建立唯一識別碼的照片

from uuid import uuid1


def save_snapshot(directory):

    image_path = os.path.join(directory, str(uuid1()) + '.jpg')

    with open(image_path, 'wb') as f:

        f.write(image.value)


def save_free():

    global free_dir, free_count

    save_snapshot(free_dir)

    free_count.value = len(os.listdir(free_dir))

    

def save_blocked():

    global blocked_dir, blocked_count

    save_snapshot(blocked_dir)

    blocked_count.value = len(os.listdir(blocked_dir))

    

# attach the callbacks, we use a 'lambda' function to ignore the

# parameter that the on_click event would provide to our function

# because we don't need it.

free_button.on_click(lambda x: save_free())

blocked_button.on_click(lambda x: save_blocked())


#顯示圖片與按鈕

display(image)

display(widgets.HBox([free_count, free_button]))

display(widgets.HBox([blocked_count, blocked_button]))


開始拍照,拍照技巧:

#拍照結束後停用鏡頭,並將資料壓縮成一個檔案

camera.stop()

!zip -r -q dataset.zip dataset

Train Model

使用Pytorch訓練資料

import torch

import torch.optim as optim

import torch.nn.functional as F

import torchvision

import torchvision.datasets as datasets

import torchvision.models as models

import torchvision.transforms as transforms


#解壓縮資料圖檔

#!unzip -q dataset.zip


#建立資料集

dataset = datasets.ImageFolder(

    'dataset',

    transforms.Compose([

        transforms.ColorJitter(0.1, 0.1, 0.1, 0.1),

        transforms.Resize((224, 224)),

        transforms.ToTensor(),

        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

    ])

)


#將資料分為訓練資料和測試資料

train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - 50, 50])



#建立資料載入器

train_loader = torch.utils.data.DataLoader(

    train_dataset,

    batch_size=8,

    shuffle=True,

    num_workers=0

)


test_loader = torch.utils.data.DataLoader(

    test_dataset,

    batch_size=8,

    shuffle=True,

    num_workers=0

)



#定義神經網路,以alexnet使用遷移式學習

model = models.alexnet(pretrained=True)

#alexnet 模型最初是為具有 1000 個類標籤的數據集訓練的,但我們的數據集只有兩個類標籤! 

#我們將用一個只有兩個輸出的新的、未經訓練的層替換最後一層。

model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, 2)



#model = models.resnet18(pretrained=True)

##resnet18 模型最初是為具有 1000 個類標籤的數據集訓練的,但我們的數據集只有兩個類標籤!

##我們將用一個只有兩個輸出的新的、未經訓練的層替換最後一層。

#model.fc = torch.nn.Linear(512, 2)



#設定使用cuda

device = torch.device('cuda')

#將資料丟入cuda

model = model.to(device)


#開始訓練資料

NUM_EPOCHS = 30

BEST_MODEL_PATH = 'best_model.pth'

#BEST_MODEL_PATH = 'best_model_resnet18.pth'

best_accuracy = 0.0


optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)


for epoch in range(NUM_EPOCHS):

    

    for images, labels in iter(train_loader):

        images = images.to(device)

        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(images)

        loss = F.cross_entropy(outputs, labels)

        loss.backward()

        optimizer.step()

    

    test_error_count = 0.0

    for images, labels in iter(test_loader):

        images = images.to(device)

        labels = labels.to(device)

        outputs = model(images)

        test_error_count += float(torch.sum(torch.abs(labels - outputs.argmax(1))))

    

    test_accuracy = 1.0 - float(test_error_count) / float(len(test_dataset))

    print('%d: %f' % (epoch, test_accuracy))

    if test_accuracy > best_accuracy:

        torch.save(model.state_dict(), BEST_MODEL_PATH)

        best_accuracy = test_accuracy

Pytorch to TensorRT

如果已經有torch2trt就可以不用安裝

cd $HOME

git clone https://github.com/NVIDIA-AI-IOT/torch2trt

cd torch2trt

sudo python3 setup.py install

轉換模型:

import torch

import torchvision


#載入resnet18 pytorch模型檔

model = torchvision.models.resnet18(pretrained=False)

model.fc = torch.nn.Linear(512, 2)

model = model.cuda().eval().half()

model.load_state_dict(torch.load('best_model_resnet18.pth'))

device = torch.device('cuda')


from torch2trt import torch2trt

data = torch.zeros((1, 3, 224, 224)).cuda().half()

model_trt = torch2trt(model, [data], fp16_mode=True)


#輸出TensorRT模型

torch.save(model_trt.state_dict(), 'best_model_trt.pth')


Collision Avoidance

import torch

import torchvision


model = torchvision.models.alexnet(pretrained=False)

model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, 2)


model.load_state_dict(torch.load('best_model.pth'))


device = torch.device('cuda')

model = model.to(device)



'''

我們現在已經加載了我們的模型,但是有一個小問題。 我們訓練模型的格式與相機的格式不完全匹配。 為此,我們需要進行一些預處理。 這涉及以下步驟:

1.從 BGR 轉換為 RGB

2.從 HWC 佈局轉換為 CHW 佈局

3.使用與訓練期間相同的參數進行歸一化(我們的相機提供 [0, 255] 範圍內的值和 [0, 1] 範圍內的訓練加載圖像,因此我們需要按 255.0 縮放

4.將數據從 CPU 內存傳輸到 GPU 內存

5.添加批次維度

'''


import cv2

import numpy as np


mean = 255.0 * np.array([0.485, 0.456, 0.406])

stdev = 255.0 * np.array([0.229, 0.224, 0.225])


normalize = torchvision.transforms.Normalize(mean, stdev)


def preprocess(camera_value):

    global device, normalize

    x = camera_value

    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)

    x = x.transpose((2, 0, 1))

    x = torch.from_numpy(x).float()

    x = normalize(x)

    x = x.to(device)

    x = x[None, ...]

    return x



import traitlets

from IPython.display import display

import ipywidgets.widgets as widgets

from jetbot import Camera, bgr8_to_jpeg


camera = Camera.instance(width=224, height=224)

image = widgets.Image(format='jpeg', width=224, height=224)

blocked_slider = widgets.FloatSlider(description='blocked', min=0.0, max=1.0, orientation='vertical')

speed_slider = widgets.FloatSlider(description='speed', min=0.0, max=0.5, value=0.0, step=0.01, orientation='horizontal')


camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)


display(widgets.VBox([widgets.HBox([image, blocked_slider]), speed_slider]))



#

from jetbot import Robot


robot = Robot()


import torch.nn.functional as F

import time


def update(change):

    global blocked_slider, robot

    x = change['new'] 

    x = preprocess(x)

    y = model(x)

    

    # we apply the `softmax` function to normalize the output vector so it sums to 1 (which makes it a probability distribution)

    y = F.softmax(y, dim=1)

    

    prob_blocked = float(y.flatten()[0])

    

    blocked_slider.value = prob_blocked

    

    if prob_blocked < 0.5:

        robot.forward(speed_slider.value)

    else:

        robot.left(speed_slider.value)

    

    time.sleep(0.001)

        

# we call the function once to initialize

update({'new': camera.value})


#generating new commands with each new camera frame

# this attaches the 'update' function to the 'value' traitlet of our camera

camera.observe(update, names='value')