Collision Avoidance
Data Collection
import traitlets
import ipywidgets.widgets as widgets
from IPython.display import display
from jetbot import Camera, bgr8_to_jpeg
#連結相機
camera = Camera.instance(width=224, height=224)
image = widgets.Image(format='jpeg', width=224, height=224) # this width and height doesn't necessarily have to match the camera
camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)
#建立資料夾
import os
blocked_dir = 'dataset/blocked'
free_dir = 'dataset/free'
# we have this "try/except" statement because these next functions can throw an error if the directories exist already
try:
os.makedirs(free_dir)
os.makedirs(blocked_dir)
except FileExistsError:
print('Directories not created because they already exist')
#建立按鈕物件
button_layout = widgets.Layout(width='128px', height='64px')
free_button = widgets.Button(description='add free', button_style='success', layout=button_layout)
blocked_button = widgets.Button(description='add blocked', button_style='danger', layout=button_layout)
free_count = widgets.IntText(layout=button_layout, value=len(os.listdir(free_dir)))
blocked_count = widgets.IntText(layout=button_layout, value=len(os.listdir(blocked_dir)))
#使用UUID建立唯一識別碼的照片
from uuid import uuid1
def save_snapshot(directory):
image_path = os.path.join(directory, str(uuid1()) + '.jpg')
with open(image_path, 'wb') as f:
f.write(image.value)
def save_free():
global free_dir, free_count
save_snapshot(free_dir)
free_count.value = len(os.listdir(free_dir))
def save_blocked():
global blocked_dir, blocked_count
save_snapshot(blocked_dir)
blocked_count.value = len(os.listdir(blocked_dir))
# attach the callbacks, we use a 'lambda' function to ignore the
# parameter that the on_click event would provide to our function
# because we don't need it.
free_button.on_click(lambda x: save_free())
blocked_button.on_click(lambda x: save_blocked())
#顯示圖片與按鈕
display(image)
display(widgets.HBox([free_count, free_button]))
display(widgets.HBox([blocked_count, blocked_button]))
開始拍照,拍照技巧:
嘗試不同的方向
嘗試不同的照明
嘗試不同的對象/碰撞類型; 牆壁、壁架、物體
嘗試不同紋理的地板/物體; 有圖案的,光滑的,玻璃的,等等。
#拍照結束後停用鏡頭,並將資料壓縮成一個檔案
camera.stop()
!zip -r -q dataset.zip dataset
Train Model
使用Pytorch訓練資料
import torch
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms
#解壓縮資料圖檔
#!unzip -q dataset.zip
#建立資料集
dataset = datasets.ImageFolder(
'dataset',
transforms.Compose([
transforms.ColorJitter(0.1, 0.1, 0.1, 0.1),
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
)
#將資料分為訓練資料和測試資料
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - 50, 50])
#建立資料載入器
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=8,
shuffle=True,
num_workers=0
)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=8,
shuffle=True,
num_workers=0
)
#定義神經網路,以alexnet使用遷移式學習
model = models.alexnet(pretrained=True)
#alexnet 模型最初是為具有 1000 個類標籤的數據集訓練的,但我們的數據集只有兩個類標籤!
#我們將用一個只有兩個輸出的新的、未經訓練的層替換最後一層。
model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, 2)
#model = models.resnet18(pretrained=True)
##resnet18 模型最初是為具有 1000 個類標籤的數據集訓練的,但我們的數據集只有兩個類標籤!
##我們將用一個只有兩個輸出的新的、未經訓練的層替換最後一層。
#model.fc = torch.nn.Linear(512, 2)
#設定使用cuda
device = torch.device('cuda')
#將資料丟入cuda
model = model.to(device)
#開始訓練資料
NUM_EPOCHS = 30
BEST_MODEL_PATH = 'best_model.pth'
#BEST_MODEL_PATH = 'best_model_resnet18.pth'
best_accuracy = 0.0
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
for epoch in range(NUM_EPOCHS):
for images, labels in iter(train_loader):
images = images.to(device)
labels = labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = F.cross_entropy(outputs, labels)
loss.backward()
optimizer.step()
test_error_count = 0.0
for images, labels in iter(test_loader):
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
test_error_count += float(torch.sum(torch.abs(labels - outputs.argmax(1))))
test_accuracy = 1.0 - float(test_error_count) / float(len(test_dataset))
print('%d: %f' % (epoch, test_accuracy))
if test_accuracy > best_accuracy:
torch.save(model.state_dict(), BEST_MODEL_PATH)
best_accuracy = test_accuracy
Pytorch to TensorRT
如果已經有torch2trt就可以不用安裝
cd $HOME
git clone https://github.com/NVIDIA-AI-IOT/torch2trt
cd torch2trt
sudo python3 setup.py install
轉換模型:
import torch
import torchvision
#載入resnet18 pytorch模型檔
model = torchvision.models.resnet18(pretrained=False)
model.fc = torch.nn.Linear(512, 2)
model = model.cuda().eval().half()
model.load_state_dict(torch.load('best_model_resnet18.pth'))
device = torch.device('cuda')
from torch2trt import torch2trt
data = torch.zeros((1, 3, 224, 224)).cuda().half()
model_trt = torch2trt(model, [data], fp16_mode=True)
#輸出TensorRT模型
torch.save(model_trt.state_dict(), 'best_model_trt.pth')
Collision Avoidance
import torch
import torchvision
model = torchvision.models.alexnet(pretrained=False)
model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, 2)
model.load_state_dict(torch.load('best_model.pth'))
device = torch.device('cuda')
model = model.to(device)
'''
我們現在已經加載了我們的模型,但是有一個小問題。 我們訓練模型的格式與相機的格式不完全匹配。 為此,我們需要進行一些預處理。 這涉及以下步驟:
1.從 BGR 轉換為 RGB
2.從 HWC 佈局轉換為 CHW 佈局
3.使用與訓練期間相同的參數進行歸一化(我們的相機提供 [0, 255] 範圍內的值和 [0, 1] 範圍內的訓練加載圖像,因此我們需要按 255.0 縮放
4.將數據從 CPU 內存傳輸到 GPU 內存
5.添加批次維度
'''
import cv2
import numpy as np
mean = 255.0 * np.array([0.485, 0.456, 0.406])
stdev = 255.0 * np.array([0.229, 0.224, 0.225])
normalize = torchvision.transforms.Normalize(mean, stdev)
def preprocess(camera_value):
global device, normalize
x = camera_value
x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
x = x.transpose((2, 0, 1))
x = torch.from_numpy(x).float()
x = normalize(x)
x = x.to(device)
x = x[None, ...]
return x
import traitlets
from IPython.display import display
import ipywidgets.widgets as widgets
from jetbot import Camera, bgr8_to_jpeg
camera = Camera.instance(width=224, height=224)
image = widgets.Image(format='jpeg', width=224, height=224)
blocked_slider = widgets.FloatSlider(description='blocked', min=0.0, max=1.0, orientation='vertical')
speed_slider = widgets.FloatSlider(description='speed', min=0.0, max=0.5, value=0.0, step=0.01, orientation='horizontal')
camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)
display(widgets.VBox([widgets.HBox([image, blocked_slider]), speed_slider]))
#
from jetbot import Robot
robot = Robot()
import torch.nn.functional as F
import time
def update(change):
global blocked_slider, robot
x = change['new']
x = preprocess(x)
y = model(x)
# we apply the `softmax` function to normalize the output vector so it sums to 1 (which makes it a probability distribution)
y = F.softmax(y, dim=1)
prob_blocked = float(y.flatten()[0])
blocked_slider.value = prob_blocked
if prob_blocked < 0.5:
robot.forward(speed_slider.value)
else:
robot.left(speed_slider.value)
time.sleep(0.001)
# we call the function once to initialize
update({'new': camera.value})
#generating new commands with each new camera frame
# this attaches the 'update' function to the 'value' traitlet of our camera
camera.observe(update, names='value')