/* Copyright 2016 The Roughtime Authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License. */

#include "udp_processor.h"

#include <fcntl.h>
#include <unistd.h>

#include <google/protobuf/stubs/logging.h>

#include "open_source_fillins.h"

namespace roughtime {

UdpProcessor::UdpProcessor() {
  // These never change.  We can set them up just once and reuse them forever.
  memset(recv_mmsghdrs_, 0, sizeof(recv_mmsghdrs_));
  for (size_t i = 0; i < kBatchSize; i++) {
    recv_iov_[i].iov_base = recv_buf_[i];
    recv_iov_[i].iov_len = sizeof(recv_buf_[i]);
    recv_mmsghdrs_[i].msg_hdr.msg_name = &sockaddrs_[i];
    recv_mmsghdrs_[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
    recv_mmsghdrs_[i].msg_hdr.msg_iov = &recv_iov_[i];
    recv_mmsghdrs_[i].msg_hdr.msg_iovlen = 1;
  }

  memset(send_mmsghdrs_, 0, sizeof(send_mmsghdrs_));
  for (size_t i = 0; i < kBatchSize; i++) {
    send_iov_[i].iov_base = send_buf_[i];
    send_mmsghdrs_[i].msg_hdr.msg_iov = &send_iov_[i];
    send_mmsghdrs_[i].msg_hdr.msg_iovlen = 1;
    // iov_len varies per batch, as does msg_name.
  }
}

UdpProcessor::~UdpProcessor() {}

bool UdpProcessor::AddBrokenReplyGenerator(
    std::unique_ptr<BrokenReplyGenerator> broken_reply_generator) {
  const uint16_t new_sum =
      broken_reply_generator_sum_ + broken_reply_generator->probability_1024();
  if (new_sum > 1024) {
    return false;
  }

  broken_reply_generator_sum_ = new_sum;
  broken_reply_generators_.push_back(std::move(broken_reply_generator));

  return true;
}

// static
bool UdpProcessor::MakeSocket(int port, int *out_sock, uint16_t *out_port) {
  *out_sock = -1;
  *out_port = 0;

  const int fd = socket(AF_INET6, SOCK_DGRAM, 0);
  if (fd == -1) {
    GOOGLE_PLOG(ERROR) << "socket";
    return false;
  }

  struct sockaddr_in6 sin6;
  memset(&sin6, 0, sizeof(sin6));
  sin6.sin6_family = AF_INET6;
  sin6.sin6_addr = in6addr_any;
  sin6.sin6_port = htons(port);
  if (bind(fd, reinterpret_cast<sockaddr *>(&sin6), sizeof(sin6))) {
    GOOGLE_PLOG(ERROR) << "bind";
    close(fd);
    return false;
  }

  socklen_t sin6_len = sizeof(sin6);
  if (getsockname(fd, reinterpret_cast<sockaddr *>(&sin6), &sin6_len)) {
    GOOGLE_PLOG(ERROR) << "getsockname";
    close(fd);
    return false;
  }

  *out_sock = fd;
  *out_port = ntohs(sin6.sin6_port);
  return true;
}

void UdpProcessor::PrepareResponse(struct msghdr *out_send_header,
                                   const struct msghdr &recv_header) {
  out_send_header->msg_name = recv_header.msg_name;
  out_send_header->msg_namelen = recv_header.msg_namelen;
}

void UdpProcessor::Reset() {
  // Clear out the addresses from last time.  (Without this |recvmmsg| thinks
  // we are trying to receive from these addresses.)
  memset(sockaddrs_, 0, sizeof(sockaddrs_));
  for (size_t i = 0; i < kBatchSize; i++) {
    recv_mmsghdrs_[i].msg_hdr.msg_flags = 0;
    recv_mmsghdrs_[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
  }
}

#if defined(__MACH__)
static const unsigned MSG_WAITFORONE = 0;

static int recvmmsg(int fd, struct mmsghdr *msgvec, unsigned vlen,
                    unsigned flags, struct timespec *timeout) {
  ssize_t r = recvmsg(fd, &msgvec->msg_hdr, 0);
  if (r < 0) {
    return r;
  }

  msgvec->msg_len = r;
  return 1;
}

int sendmmsg(int fd, struct mmsghdr *msgvec, unsigned vlen, unsigned flags) {
  GOOGLE_CHECK_EQ(1, vlen);
  ssize_t r = sendmsg(fd, &msgvec->msg_hdr, 0);
  if (r < 0) {
    return r;
  }

  msgvec->msg_len = r;
  return 1;
}
#endif

bool UdpProcessor::ProcessBatch(int fd, Server *server, Stats *out_stats) {
  server->Reset();
  memset(out_stats, 0, sizeof(Stats));
  Reset();

  int r;
  do {
    r = recvmmsg(fd, recv_mmsghdrs_, kBatchSize, MSG_WAITFORONE,
                 nullptr /* timeout */);
  } while (r == -1 && errno == EINTR);

  if (r < 0) {
    GOOGLE_PLOG(ERROR) << "recvmmsg";
    return false;
  } else if (r == 0) {
    return true;
  }

  out_stats->packets_in = r;

  size_t index = 0;
  for (size_t i = 0; i < static_cast<size_t>(r); i++) {
    const msghdr *recv_header = &recv_mmsghdrs_[i].msg_hdr;
    out_stats->bytes_in += recv_mmsghdrs_[i].msg_len;
    if ((recv_header->msg_flags & (MSG_CTRUNC | MSG_TRUNC)) != 0) {
      out_stats->requests_truncated++;
      continue;
    }
    if (!server->AddRequest(recv_buf_[i], recv_mmsghdrs_[i].msg_len)) {
      out_stats->requests_invalid++;
      continue;
    }

    // Fill in the destination address for this response.  The data and its
    // length will be set below.
    msghdr *send_header = &send_mmsghdrs_[index].msg_hdr;
    PrepareResponse(send_header, *recv_header);
    index++;
  }

  if (index == 0) {
    return true;
  }

  server->Sign();

  out_stats->packets_out = index;
  for (size_t i = 0; i < index; i++) {
    size_t reply_len;
    if (!server->MakeResponse(send_buf_[i], &reply_len, i)) {
      GOOGLE_LOG(ERROR) << "failed to assemble responses";
      return false;
    }
    requests_processed_++;

    uint8_t broken_output_buf[kMaxResponseSize];
    if (MaybeBreakResponse(broken_output_buf, &reply_len,
                           sizeof(broken_output_buf), send_buf_[i], reply_len,
                           recv_buf_[i], recv_mmsghdrs_[i].msg_len)) {
      GOOGLE_DCHECK_LE(reply_len, sizeof(broken_output_buf));
      memcpy(send_buf_[i], broken_output_buf, reply_len);
    }

    out_stats->bytes_out += reply_len;
    send_iov_[i].iov_len = reply_len;
  }

  int messages_sent;
  do {
    messages_sent = sendmmsg(fd, send_mmsghdrs_, index, 0);
  } while (messages_sent == -1 && errno == EINTR);

  if (messages_sent == -1) {
    GOOGLE_PLOG(ERROR) << "sendmmsg";
    return false;
  }
  GOOGLE_LOG_IF_EVERY_N_SEC(ERROR, (static_cast<size_t>(messages_sent) < index),
                            30)
      << "only " << messages_sent << " of " << index << " messages were sent";

  return true;
}

bool UdpProcessor::MaybeBreakResponse(uint8_t *out, size_t *out_len,
                                      size_t max_out_len,
                                      const uint8_t *normal_response,
                                      size_t normal_response_len,
                                      const uint8_t *request,
                                      size_t request_len) {
  const uint16_t rand = requests_processed_ & 1023;
  if (rand >= broken_reply_generator_sum_) {
    return false;
  }

  uint16_t sum = 0;
  BrokenReplyGenerator *selected = nullptr;
  for (const auto &i : broken_reply_generators_) {
    sum += i->probability_1024();
    if (rand < sum) {
      selected = i.get();
      break;
    }
  }

  return selected->Process(out, out_len, max_out_len, normal_response,
                           normal_response_len, request, request_len);
}

}  // namespace roughtime
